Search

카프카 레이어드 아키텍처

레이어드 아키텍처 기준으로 "깔끔하게 구성해보자" Top-Level Package 를 레이어 기준으로 가져간 경우 /common /config RedisConfig KafkaConfig CacheConfig JpaConfig /interfaces ( "외부" 진입점 ) /api // HTTP 통신 ( 외부 ) /user UserController /concert /reservation /payment /event // 애플리케이션 이벤트 ( 내부 ) - 시적 허용 대상 /reservation ReservationEventListener /payment PaymentEventListener /consumer // 카프카 Consumer ( 외부 ) /payment PaymentMessageConsumer --------------------------------- 비즈니스 레이어 ( "외부" 와 전혀 연관 없는.. 우리 고유 비즈니스 ) /application /user /concert /payment PaymentFacade /domain /user /concert /payment /message PaymentMessage PaymentProducer (IF) PaymentMessageOutboxWriter (IF) /event PaymentEvent PaymentEventPublisher (IF) PaymentService PaymentRepository (IF) ---------------------------------- /infrastructure ( "외부" 통신부 ) /payment PaymentRepositoryImpl /db // JPA , QueryDSL, Mybatis /payment PaymentOutboxEntity PaymentJpaOutboxWriter (구현체) /redis // Redis 관련된 구현체 /kafka // Kafka 관련된 구현체 /payment PaymentKafkaMessageSender (구현체) /spring // CahceManager, ApplicationEvent... <- 스프링 의존성 /payment PaymentSpringEventPublisher (구현체) class PaymentKafkaMessageSender implements PaymentMessageSender { @Autowired private final KafkaTemplate<String, Any> kafkaTemplate; @Value("\${payment.topic-name}" private final String PAYMENT_TOPIC; @Override void send(PaymentMessage message) { kafkaTemplate.send(PAYMENT_TOPIC, message); } } // 아웃박스 마킹 class PaymentMessageConsumer { private final PaymentMessageOutboxWriter paymentMessageOutboxWriter; @KafkaListener( topic = ["\${payment.topic-name}"] ) void complete(PaymentMessage message) { paymentMessageOutboxWriter.complete(message); } } // 이벤트 리스너 class PaymentEventListener { private final PaymentMessageOutboxWriter paymentMessageOutboxWriter; private final PaymentMessageSender paymentMessageSender; // 같은 트랜잭션 안에서 커밋이 함께 되는 걸 보장해야함 @TransactionalEventListener(phase = BEFORE_COMMIT) void createOutboxMessage(PaymentEvent event) { paymentMessageOutboxWriter.save(PaymentMessage(event...)); } // 커밋이 완료되었다면, 메세지를 발행해야 됨 @Async @TransactionalEventListener(phase = AFTER_COMMIT) void sendMessage(PaymentEvent event) { paymentMessageSender.send(PaymentMessage(event...)); } @Async @TransactionalEventListener(phase = AFTER_COMMIT) void saveHistory(PaymentEvent event) { // 해당 유저한테 마일리지 적립 pointFacade.earnMileage(event.userId, event.amount); } } (메세지별 ) Outbox Table id(식별자 - Unique) | message(varchar) | status | created_at | updated_at ObjectMapper.writeToJsonString(PaymentMessage); --> JSON String 으로 만들어줌. read 연산 ObjectMapper.readFrom<PaymentMessage>(paymentOutboxEntity.message); (공용으로 쓰는 경우) Outbox Table id(식별자 - Unique) | message_type | message_id | message(varchar) | status | created_at | updated_at
JavaScript
복사