코딩공작소

MSA(10) - 스트림을 사용한 이벤트 기반 아키텍처 본문

어플리케이션개발/MSA

MSA(10) - 스트림을 사용한 이벤트 기반 아키텍처

안잡아모찌 2024. 7. 25. 20:21

애플레케이션 내 통신을 할 때, 반복되는 데이터 읽기의 서비스인 경우 캐싱이 유효할 수 있다.
이는, 서비스에 대한 호출 응답 시간을 크게 향상시킬 수 있다.

 

캐싱 솔루션의 세 가지 핵심 요구 사항

  1. 캐싱된 데이터는 라이선싱 서비스의 모든 인스턴스에 일관성이 있어야 한다.
  2. 라이선싱 서비스를 호스팅하는 컨테이너 메모리에 조직 데이터를 캐싱하면 안 된다.
  3. 업데이트나 삭제로 조직 레코드가 변경될 때 라이선싱 서비스는 조직 서비스의 상태 변화를 인식해야 한다.

 

이러한 요구 사항을 만족하는 방법 중
조직 서비스가 자기 데이터가 변경되었음을 알리려고 비동기 이벤트를 발송하는 방법이 효과적이다.
이 방법은 조직 서비스의 레코드가 변경되었을 때 메시지를 큐에 발행하고, 중개자(브로커)와 라이선싱 서비스는 조직 이벤트가 발생했는지 확인하고, 발행하면 캐시에서 조직 데이터를 삭제한다.

 

메시징을 사용한 서비스 간 상태 변화 전달

라이선싱 서비스와 조직 서비스 사이에는 토픽이라는 메시징 방식이 추가되며, 조직서비스의 상태 변경에 메시지를 관리한다.

 

이때, 메시지 큐는 중개자 역할을 하며 4 가지 이점을 제공한다.

  1. 느슨한 결합
    조직 서비스는 상태 변화를 발행해야 할 때 메시지 큐에 기록한다. 라이선싱 서비스는 메시지가 수신되었다는 것만 알고 누가 발행했는지는 알지 못한다.
  2. 내구성
    큐가 있으면 서비스 소비자가 다운된 경우에도 메시지 전달을 보장할 수 있다.
  3. 확장성
    속도가 나오지 않아도, 메시지 소비자를 더 많이 가동시켜 큐의 메시지를 처리하게 하는 것이 간편하다.
  4. 유연성
    새로운 소비자의 코드는 발행되는 이벤트를 수신하고 적절히 대응할 수 있다.

 

메시징 아키텍처의 단점

  1. 메시지 처리의 의미론
    애플리케이션이 메시지가 소비되는 순서에 따라 어떻게 동작하는지와 메시지가 순서대로 처리되지 않을 때 어떻게 되는지를 이해해야 한다.
  2. 메시지 가시성
    상관관계 ID는 사용자 트랜잭션의 시작 시점에 생성되어 모든 서비스 호출에 전달되는 고유한 번호이며 발행 및 소비되는 모든 메시지에 포함되어 전달되어야 한다.
  3. 메시지 코레오그래피
    선형적인 방식이 아니기 때문에 비즈니스 로직을 추론하는 것이 어렵고, 여러 다른 서비스의 로그를 모두 살펴보아야 한다.

 

 

스프링 클라우드 스트림

애플리케이션의 메시지 발행자와 소비자를 쉽게 구축할 수 있는 애너테이션 기반 프레임워크다.

메시징을 사용하기 위해서는 두 서비스 관점에서 메시지 발행자와 메시지 소비자가 필요하다.

  • 소스 : 메시지 발행
  • 채널 : 메시지를 보관할 큐를 추상화
  • 바인더 : 특정 메시지 플랫폼과 통신하는 스프링 코드
  • 싱크 : 큐에서 메시지 수신

 

 

간단한 메시지 생산자와 소비자 작성

아파치 카프카 및 레디스 도커 구성

docker-compose.yml 파일에 코드를 추가해야 한다.

 zookeeper:
    image: zookeeper:3.7.0
    container_name: zookeeper
    ports:
      - 2181:2181
    networks:
      backend:
        aliases:
          - "zookeeper"
  kafkaserver:
    image: wurstmeister/kafka:latest
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ADVERTISED_PORT=9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=dresses:1:1,ratings:1:1
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    depends_on:
      - zookeeper
    networks:
      backend:
        aliases:
          - "kafka"
  redisserver:
    image: redis:alpine
    container_name: redis
    ports:
      - 6379:6379
    networks:
      backend:
        aliases:
          - "redis"

메시지 생산자 도커 환경에 카프카 및 레디스 서비스를 추가한다.

 

 

<<pom.xml>>

<dependency>
	<groupId>org.springframework.cloud</>
    <artifactId>spring-cloud-stream</>
</>
<dependency>
	<groupId>org.springframework.cloud</>
    <artifactId>spring-cloud-starter-stream-kafka</>
</>

mvc clean package dockerfile:build && docker-compose -f docker/docker-compose.yml up

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.optimagrowth.organization;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
 
@SpringBootApplication
@RefreshScope
@EnableBinding(Source.class) --> 스프링 클라우드 스트림이 메시지 브로커에 애플리케이션을 바인딩하도록 지정
public class OrganizationServiceApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(OrganizationServiceApplication.class, args);
    }
 
}
cs

EnableBinding애너테이션은 클래스에서 정의된 채널들을 이용하여 메시지 브로커와 통신할 것이라고 스트링 클라우드 스트림에 알린다.

 

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.optimagrowth.organization.utils;
 
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
 
@Component
public class UserContext {
    public static final String CORRELATION_ID = "tmx-correlation-id";
    public static final String AUTH_TOKEN     = "Authorization";
    public static final String USER_ID        = "tmx-user-id";
    public static final String ORG_ID         = "tmx-org-id";
 
    private static final ThreadLocal<String> correlationId= new ThreadLocal<String>();
    private static final ThreadLocal<String> authToken= new ThreadLocal<String>();
    private static final ThreadLocal<String> userId = new ThreadLocal<String>();
    private static final ThreadLocal<String> orgId = new ThreadLocal<String>();
 
 
    public static String getCorrelationId() { return correlationId.get(); }
    public static void setCorrelationId(String cid) {correlationId.set(cid);}
 
    public static String getAuthToken() { return authToken.get(); }
    public static void setAuthToken(String aToken) {authToken.set(aToken);}
 
    public static String getUserId() { return userId.get(); }
    public static void setUserId(String aUser) {userId.set(aUser);}
 
    public static String getOrgId() { return orgId.get(); }
    public static void setOrgId(String aOrg) {orgId.set(aOrg);}
 
    public static HttpHeaders getHttpHeaders(){
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.set(CORRELATION_ID, getCorrelationId());
 
        return httpHeaders;
    }
 
}
cs

변수를 스레드 로컬로 만든다. 이렇게 하면 현재 스레드에 대한 데이터를 스레드별로 저장할 수 있다.
여기에 설정된 정보는 그 값을 설정한 스레드만 읽을 수 있다.

 

 

메시지 브로커에 메시지 발행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.optimagrowth.organization.events.source;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
 
import com.optimagrowth.organization.events.model.OrganizationChangeModel;
import com.optimagrowth.organization.utils.ActionEnum;
import com.optimagrowth.organization.utils.UserContext;
 
@Component
public class SimpleSourceBean {
    private Source source;
 
    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
 
    public SimpleSourceBean(Source source){ --> 서비스에서 사용되는 Source 인터페이스 구현체를 주입
        this.source = source;
    }
 
    public void publishOrganizationChange(ActionEnum action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action.toString(),
                organizationId,
                UserContext.getCorrelationId()); --> 자바 POJO 메시지를 발행한다.
 
        source.output().send(MessageBuilder.withPayload(change).build());--> Source 클래스에서
정의된 채널에서 전달된 메시지를 발송한다.
    }
}
cs

스프링 클라우드 Source 클래스에 코드를 주입했다. 특정 메시지 토픽에 대한 모든 통신은 스프링 클라우드 스트림의 채널이라는 자바 인터페이스 클래스로 한다.

 

 

채널모델 객체 발행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.optimagrowth.organization.events.model;
 
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
 
@Getter @Setter @ToString
public class OrganizationChangeModel {
    private String type;
    private String action;
    private String organizationId;
    private String correlationId;
 
    public OrganizationChangeModel(String type, String action, String organizationId, String correlationId) {
        super();
        this.type = type;
        this.action = action;
        this.organizationId = organizationId;
        this.correlationId = correlationId;
    }
}
cs
  • action : 이벤트를 발생시킨 액션
  • organizationId : 이벤트와 연관된 조직 ID
  • correlationId : 이벤트를 발생시킨 서비스 호출의 상관관계 ID ( 디버깅에 필수 )

 

조직 서비스에서 메시지 발행하기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.optimagrowth.organization.service;
 
import java.util.Optional;
import java.util.UUID;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import com.optimagrowth.organization.events.source.SimpleSourceBean;
import com.optimagrowth.organization.model.Organization;
import com.optimagrowth.organization.repository.OrganizationRepository;
import com.optimagrowth.organization.utils.ActionEnum;
 
import brave.ScopedSpan;
import brave.Tracer;
 
@Service
public class OrganizationService {
    
    private static final Logger logger = LoggerFactory.getLogger(OrganizationService.class);
    
    @Autowired
    private OrganizationRepository repository;
    
    @Autowired
   SimpleSourceBean simpleSourceBean;--> 조직 서비스에 SimpleSourceBean을 주입하려고 자동연결
    
    @Autowired
    Tracer tracer;
 
    public Organization findById(String organizationId) {
        Optional<Organization> opt = null;
        ScopedSpan newSpan = tracer.startScopedSpan("getOrgDBCall");
        try {
        opt = repository.findById(organizationId);
        simpleSourceBean.publishOrganizationChange(ActionEnum.GET, organizationId);
        if (!opt.isPresent()) {
            String message = String.format("Unable to find an organization with the Organization id %s", organizationId);
            logger.error(message);
            throw new IllegalArgumentException(message);    
        }
        logger.debug("Retrieving Organization Info: " + opt.get().toString());
        }finally {
            newSpan.tag("peer.service""postgres");
            newSpan.annotate("Client received");
            newSpan.finish();
        }
        return opt.get();
    }    
 
    public Organization create(Organization organization){
        organization.setId( UUID.randomUUID().toString());
        organization = repository.save(organization);
        simpleSourceBean.publishOrganizationChange(ActionEnum.CREATED, organization.getId());
--> 조직 데이터를 변경하는 모든 메서드는 simpleSourceBean, publishOrganizationChange()를 호출
        return organization;
 
    }
 
    public void update(Organization organization){
        repository.save(organization);
        simpleSourceBean.publishOrganizationChange(ActionEnum.UPDATED, organization.getId());
    }
 
    public void delete(String organizationId){
        repository.deleteById(organizationId);
        simpleSourceBean.publishOrganizationChange(ActionEnum.DELETED, organizationId);
    }
    
    @SuppressWarnings("unused")
    private void sleep(){
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            logger.error(e.getMessage());
        }
    }
}
cs

 

 

 

라이선싱 서비스에서 메시지 소비자 작성

스프링 클라우드 스트림을 사용하는 서비스가 어떻게 서비스를 소비하는지 확인해보자
일단, pom.xml 파일에 의존성을 추가해야 한다.

<dependency>
	<groupId>org.springframework.cloud</>
    <artifactId>spring-cloud-stream</>
</>
<dependency>
	<groupId>org.springframework.cloud</>
    <artifactId>spring-cloud-starter-stream-kafka</>
</dependency>

 

package com...~

@EnableBinding(Sink.class) --> 유입되는 메시지를 수신하고자 Sink 인터페이스에 정의된 채널을 사용하도록 서비스를 설정
public class LicenseServiceApplicatiopn {
	@StreamListener(Sink.INPUT) --> 입력 채널에서 메시지를 받을 때마다 이 메서드를 실행한다.
    public void loggerSink(...){...}
}

라이선싱 서비스와 조직 서비스 간 차이는 @EnableBinding에 전달하는 값이 다르다는 것이다.

 

 

메시지 서비스 동작 보기

조직 서비스는 레코드가 추가, 수정, 삭제될 때마다 orgChangeTopic에 메시지를 발행하고 라이선싱 서비스는 그 토픽에서 메시지를 수신한다.
즉, 스프링 클라우드 스트림은 이 서비스 사이에서 중개자 역할을 한다.

 

 

  • 메시징을 사용한 비동기 통신은 마이크로서비스 아키텍처에서 중요한 부분이다
  • 애플리케이션에서 메시징을 사용하면 서비스를 확장하고 결함 내성을 높일 수 있다
  • 스프링 클라우드 스트림은 간단한 애너테이션을 사용하고 하부 메시징 플랫폼별 세부 정보를 추상화하여 메시지 생성 및 소비를 단순화한다.
  • 메시지 소스(source)는 메시지 브로커 큐에 메시지를 방행하는 애너테이션이 추가된 자바 메서드
  • 메시지 싱크(sink)는 메시지 브로커 큐에서 메시지를 수신하는 애너테이션이 추가된 자바 메서드
  • 레디스는 데이터베이스와 캐시로 모두 사용될 수 있는 키-값 저장소이다.