Java

[MSA] e-commerce API; 구현2.5 - notification(kafka, email)

ride-dev 2024. 6. 21. 21:21

[MSA] e-commerce API; 프로젝트 흐름

[MSA] e-commerce API; 구현1 - 설정(config-server, eureka-server, gateway)

[MSA] e-commerce API; 구현2 - customer, product, payment, order, notification

(앞선 게시글에 작성한 내용은 생략할 예정입니다)

 

작성한 ERD를 참조하여 구현합니다.

1. Notification

MongoDB의 알림(notification)에 저장할 데이터는 아래와 같습니다.

1. 고객(customer)

2. 고객(customer)의 확정 주문(order) 데이터

3. 고객(customer)의 확정 지불(payment) 데이터

Kafka를 활용하여 주문, 지불에 대한 이벤트 메시지는 비동기(Async)형식으로 처리합니다.

알림(Notification)이 주문과 지불에 대한 이벤트 메시지를 수신하면,

순서에 관계 없이 DB에 저장하고 SMTP를 통해 Mail을 보냅니다.

 

MongoDB의 Notification은 아래와 같습니다.

paymentConfirmation이 저장되면, orderConfirmation은 null로 저장됩니다.

아래는 notification프로젝트의 의존성입니다.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
    implementation 'org.springframework.boot:spring-boot-starter-mail'
    implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
    implementation 'org.springframework.cloud:spring-cloud-starter-config'
    implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'io.micrometer:micrometer-tracing-bridge-brave'
    implementation 'io.zipkin.reporter2:zipkin-reporter-brave'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

spring-boot-starter-mail 과 thymleaf 의존성을 사용하여 메일, 메일 페이지를 구현했습니다.

notification의 설정파일인 application.yml의 구성은 아래와 같습니다.

spring:
  application:
    name: notification-service
  config:
    import: optional:configserver:http://localhost:8888

config-server의 notification-service.yml은 아래와 같습니다.

server:
  port: 8040
spring:
  data:
    mongodb:
      username: ride
      password: password
      host: localhost
      port: 27017
      database: notification
      authentication-database: admin
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: paymentGroup,orderGroup
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: '*' # com.ride.ecommerce
        spring.json.type.mapping: orderConfirmation:com.ride.ecommerce.kafka.order.OrderConfirmation,paymentConfirmation:com.ride.ecommerce.kafka.payment.PaymentConfirmation
  mail:
    host: localhost
    port: 1025
    username: ride
    password: ride
    properties:
      mail:
        smtp:
          trust: "*"
        auth: true
        starttls:
          enabled: true
        connectiontimeout: 5000
        timeout: 3000
        writetimeout: 5000

spring.mail.host 로 호스트를 localhost로 설정했습니다.

spring.mail.port 로 메일서버의 포트를 1025로 설정했습니다.

username, password 로 메일서버 인증 시 사용할 사용자 정보를 지정합니다.

spring.mail.properties.mail.stmp.trust 설정을 통해 접근 제어 설정을 합니다.

spring.mail.properties.mail.auth 에 true 설정을 하는 것으로 SMTP 서버에 인증이 필요함을 나타냅니다.

spring.mail.properties.mail.starttls.enabled 설정을 통해,

starttls를 사용하여 메일을 보낼 때 보안연결을 설정할 수 있도록 합니다.

timeout 설정을 통해 전송 시 시간(밀리초) 초과 설정을 합니다.

writetimeout 설정을 통해 이메일 쓰기 작업의 시간(밀리초) 초과 설정을 합니다.

1.0 디렉토리 구조

notification의 디렉토리 구조는 아래와 같습니다.

package com.ride.ecommerce;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class NotificationApplication {

    public static void main(String[] args) {
       SpringApplication.run(NotificationApplication.class, args);
    }

}

main class에 @EnableAsyc 어노테이션을 사용하여 비동기 처리를 활성화합니다.

1.1 notification

1.1.1 Notification class, NotificationType enum

package com.ride.ecommerce.notification;

import com.ride.ecommerce.kafka.order.OrderConfirmation;
import com.ride.ecommerce.kafka.payment.PaymentConfirmation;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Getter
@Setter
@Document
public class Notification {

    @Id
    private String id;
    private NotificationType type;
    private LocalDateTime notificationDate;
    private OrderConfirmation orderConfirmation;
    private PaymentConfirmation paymentConfirmation;
}

각 Confirmation은 Kafka 이벤트에 대한 필드입니다.

주문, 지불에 대한 이벤트를 다룹니다.

package com.ride.ecommerce.notification;

public enum NotificationType {
    ORDER_CONFIRMATION,
    PAYMENT_CONFIRMATION
}

1.1.2 NotificationRepository interface

package com.ride.ecommerce.notification;

import org.springframework.data.mongodb.repository.MongoRepository;

public interface NotificationRepository extends MongoRepository<Notification, String> {
}

1.2 email

1.2.1 EmailService

package com.ride.ecommerce.email;

import com.ride.ecommerce.kafka.order.Product;
import jakarta.mail.MessagingException;
import jakarta.mail.internet.MimeMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.thymeleaf.context.Context;
import org.thymeleaf.spring6.SpringTemplateEngine;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.ride.ecommerce.email.EmailTemplates.PAYMENT_CONFIRMATION;
import static com.ride.ecommerce.email.EmailTemplates.ORDER_CONFIRMATION;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.springframework.mail.javamail.MimeMessageHelper.MULTIPART_MODE_MIXED_RELATED;

/**
 * Service class for sending emails related to payment success and order confirmation.
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class EmailService {

    private final JavaMailSender mailSender;
    private final SpringTemplateEngine templateEngine;


    /**
     * Sends an email to confirm successful payment.
     *
     * @param destinationEmail the recipient's email address
     * @param customerName the customer's name
     * @param amount the payment amount
     * @param orderReference the order reference ID
     * @throws MessagingException if an error occurs while sending the email
     */
    @Async
    public void sendPaymentSuccessEmail(
            String destinationEmail,
            String customerName,
            BigDecimal amount,
            String orderReference
    ) throws MessagingException {
        MimeMessage mimemessage = mailSender.createMimeMessage();
        MimeMessageHelper messageHelper =
                new MimeMessageHelper(mimemessage, MULTIPART_MODE_MIXED_RELATED, UTF_8.name());
        messageHelper.setFrom("123or@naver.com");

        final String templateName = PAYMENT_CONFIRMATION.getTemplate();

        Map<String, Object> variables = new HashMap<>();
        variables.put("customerName", customerName);
        variables.put("amount", amount);
        variables.put("orderReference", orderReference);

        Context context = new Context();
        context.setVariables(variables);
        messageHelper.setSubject(PAYMENT_CONFIRMATION.getSubject());

        try {
            String htmlTemplate = templateEngine.process(templateName, context);
            messageHelper.setText(htmlTemplate, true);

            messageHelper.setTo(destinationEmail);
            mailSender.send(mimemessage);
            log.info(String.format("INFO - Email successfully sent to %s with template %s ", destinationEmail, templateName));
        } catch (MessagingException e) {
            log.warn("WARNING - Cannot send email to {}", destinationEmail);
        }
    }

    /**
     * Sends an email to confirm order details.
     *
     * @param destinationEmail the recipient's email address
     * @param customerName the customer's name
     * @param amount the total order amount
     * @param orderReference the order reference ID
     * @param products the list of products in the order
     * @throws MessagingException if an error occurs while sending the email
     */
    @Async
    public void sendOrderConfirmationEmail(
            String destinationEmail,
            String customerName,
            BigDecimal amount,
            String orderReference,
            List<Product> products
    ) throws MessagingException {
        MimeMessage mimemessage = mailSender.createMimeMessage();
        MimeMessageHelper messageHelper =
                new MimeMessageHelper(mimemessage, MULTIPART_MODE_MIXED_RELATED, UTF_8.name());
        messageHelper.setFrom("123or@naver.com");

        final String templateName = ORDER_CONFIRMATION.getTemplate();

        Map<String, Object> variables = new HashMap<>();
        variables.put("customerName", customerName);
        variables.put("totalAmount", amount);
        variables.put("orderReference", orderReference);
        variables.put("products", products);

        Context context = new Context();
        context.setVariables(variables);
        messageHelper.setSubject(ORDER_CONFIRMATION.getSubject());

        try {
            String htmlTemplate = templateEngine.process(templateName, context);
            messageHelper.setText(htmlTemplate, true);

            messageHelper.setTo(destinationEmail);
            mailSender.send(mimemessage);
            log.info(String.format("INFO - Email successfully sent to %s with template %s ", destinationEmail, templateName));
        } catch (MessagingException e) {
            log.warn("WARNING - Cannot send email to {}", destinationEmail);
        }
    }
}
private final JavaMailSender mailSender;
private final SpringTemplateEngine templateEngine;

mail을 보낼 수 있도록 spring-mail관련 Bean을 주입합니다.

또한, 각 메일(order, payment)에 대한 양식을 다르게 할 수 있도록 의존성을 주입하여 사용합니다.

@Async

@Async는 비동기 처리를 위한 어노테이션입니다.

payment 성공 시 이메일을 보내는 메서드의 매개변수에는

수신자 이메일 및 이름, 총액, order참조값을 입력 받고,

mail을 보낼 html에 주입할 수 있도록 Map에 담습니다.

Map<String, Object> variables = new HashMap<>();
variables.put("customerName", customerName);
variables.put("amount", amount);
variables.put("orderReference", orderReference);

아래처럼  메일 서식에 대한 설정을 합니다.

MimeMessage mimemessage = mailSender.createMimeMessage();
MimeMessageHelper messageHelper =
        new MimeMessageHelper(mimemessage, MULTIPART_MODE_MIXED_RELATED, UTF_8.name());
messageHelper.setFrom("123or@naver.com");
final String templateName = PAYMENT_CONFIRMATION.getTemplate();

위처럼 payment메일을 보낼 html을 지정합니다.

Context context = new Context();
context.setVariables(variables);
messageHelper.setSubject(PAYMENT_CONFIRMATION.getSubject());

try {
    String htmlTemplate = templateEngine.process(templateName, context);
    messageHelper.setText(htmlTemplate, true);

    messageHelper.setTo(destinationEmail);
    mailSender.send(mimemessage);
    log.info(String.format("INFO - Email successfully sent to %s with template %s ", destinationEmail, templateName));
} catch (MessagingException e) {
    log.warn("WARNING - Cannot send email to {}", destinationEmail);
}

html에 값을 넣고, 메일을 보냅니다.

 

아래처럼 mailSender에 오류문구가 생길 수 있습니다.

Could not autowire. No beans of 'JavaMailSender' type found.

프로젝트 내 application.yml에 mail 관련 설정이 존재하지 않을 때 발생합니다.

config-server의 yml에 설정이 존재한다면,

위 오류 문구가 출력되어도 문제 없이 동작합니다.

1.2.2 EmailTemplate

package com.ride.ecommerce.email;

import lombok.Getter;

public enum EmailTemplates {
    PAYMENT_CONFIRMATION("payment-confirmation.html", "Payment successfully processed"),
    ORDER_CONFIRMATION("order-confirmation.html", "Order confirmation");
    @Getter
    private final String template;
    @Getter
    private final String subject;

    EmailTemplates(String template, String subject) {
        this.template = template;
        this.subject = subject;
    }
}

1.3 templates

이메일 양식입니다.

1.3.1 order-confirmation.html

더보기
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Order Details</title>

    <style>
        body {
            font-family: Arial, sans-serif;
            line-height: 1.6;
            background-color: #f4f4f4;
            margin: 0;
            padding: 0;
        }

        .container {
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
            background-color: #fff;
            border-radius: 8px;
            box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
        }

        h1 {
            color: #333;
        }

        table {
            width: 100%;
            border-collapse: collapse;
            margin-top: 20px;
        }

        th, td {
            padding: 12px;
            border: 1px solid #ddd;
            text-align: left;
        }

        th {
            background-color: #007BFF;
            color: #fff;
        }

        .footer {
            margin-top: 20px;
            padding-top: 10px;
            border-top: 1px solid #ddd;
            text-align: center;
        }
    </style>
</head>

<body>
<div class="container">
    <h1>Order Details</h1>
    <p>Customer: <span th:text="${customerName}"></span></p>
    <p>Order ID: <span th:text="${orderReference}"></span></p>

    <table>
        <thead>
        <tr>
            <th>Product Name</th>
            <th>Quantity</th>
            <th>Price</th>
        </tr>
        </thead>
        <tbody>
        <tr th:each="product : ${products}">
            <td th:text="${product.name}"></td>
            <td th:text="${product.quantity}"></td>
            <td th:text="${product.price}"></td>
        </tr>
        </tbody>
    </table>

    <div class="footer">
        <p>Total Amount: $<span th:text="${totalAmount}"></span></p>
        <p>This is an automated message. Please do not reply to this email.</p>
        <p>&copy; 2024 AlibouCoding</span>. All rights reserved.</p>
    </div>
</div>
</body>

</html>

1.3.2 payment-confirmation.html

더보기
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Payment Confirmation</title>

    <style>
        body {
            font-family: Arial, sans-serif;
            line-height: 1.6;
            background-color: #f4f4f4;
            margin: 0;
            padding: 0;
        }

        .container {
            max-width: 600px;
            margin: 0 auto;
            padding: 20px;
            background-color: #fff;
            border-radius: 8px;
            box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
        }

        h1 {
            color: #333;
        }

        p {
            color: #555;
        }

        .button {
            display: inline-block;
            padding: 10px 20px;
            text-align: center;
            text-decoration: none;
            color: #fff;
            background-color: #007BFF;
            border-radius: 5px;
        }

        .footer {
            margin-top: 20px;
            padding-top: 10px;
            border-top: 1px solid #ddd;
            text-align: center;
        }
    </style>
</head>

<body>
<div class="container">
    <h1>Payment Confirmation</h1>
    <p>Dear <span th:text="${customerName}"></span>,</p>
    <p>Your payment of $<span th:text="${amount}"></span> has been successfully processed.</p>
    <p>Order reference: <span th:text="${orderReference}"></span></p>
    <p>Thank you for choosing our service. If you have any questions, feel free to contact us.</p>

    <div class="footer">
        <p>This is an automated message. Please do not reply to this email.</p>
        <p>&copy; 2024 AlibouCoding. All rights reserved.</p>
    </div>
</div>
</body>

</html>

2. Kafka

order, payment 성공에 대한 이벤트를 발신하면,

Kafka는 이를 받아 저장합니다.

이후 notification이 이벤트를 읽고,

DB에 저장 및 email을 발송합니다.

 

2.1 order-service

config-server의 order-service.yml에 카프카 설정을 합니다.

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.type.mapping: orderConfirmation:com.ride.ecommerce.kafka.OrderConfirmation

bootstrap-servers: localhost:9092 는 Kafka 브로커가 실행되고 있는 서버의 주소와 포트 번호입니다.

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Kafka 메시지 키-값을 직렬화 합니다.

키를 직렬화하여, 파티션에 저장된 값을 쉽게 찾도록 하며 순서를 보장합니다.

spring.json.type.mapping: orderConfirmation:com.ride.ecommerce.kafka.OrderConfirmation

orderConfirmation 타입의 JSON 메시지를 com.ride.ecommerce.kafka.OrderConfirmation 클래스에 매핑합니다.

JSON 메시지를 해당 클래스의 객체로 변환할 수 있습니다.

2.1.1 config

KafkaOrderTopicConfig class

order topic 의존성을 주입합니다.

package com.ride.ecommerce.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaOrderTopicConfig {

    @Bean
    public NewTopic orderTopic() {
        return TopicBuilder
                .name("order-topic")
                .build();
    }
}

2.1.2 kafka

OrderConfirmation record

Kafka 메시지로 관리할 자료형을 정의합니다.

package com.ride.ecommerce.kafka;

import com.ride.ecommerce.customer.CustomerResponse;
import com.ride.ecommerce.order.PaymentMethod;
import com.ride.ecommerce.product.PurchaseResponse;

import java.math.BigDecimal;
import java.util.List;

public record OrderConfirmation(
        String orderReference,
        BigDecimal totalAmount,
        PaymentMethod paymentMethod,
        CustomerResponse customer,
        List<PurchaseResponse> products
) {
}

OrderProducer class

order producer을 Spring container로 관리합니다.

order 확정 시, Kafka의 order-topic에 메시지를 보냅니다.

package com.ride.ecommerce.kafka;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import static org.springframework.kafka.support.KafkaHeaders.TOPIC;

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderProducer {

    private final KafkaTemplate<String, OrderConfirmation> kafkaTemplate;

    public void sendOrderConfirmation(OrderConfirmation orderConfirmation) {
        log.info("Sending order confirmation");
        Message<OrderConfirmation> message = MessageBuilder
                .withPayload(orderConfirmation)
                .setHeader(TOPIC, "order-topic")
                .build();
        kafkaTemplate.send(message);
    }
}

2.1.3 order

OrderService class

위에서 정의한 Spring Bean을 주입합니다.

private final OrderProducer orderProducer;

order를 생성하고,

로직을 처리합니다.

OrderRequest 객체 데이터를 기반으로 sendOrderConfirmation 메서드를 호출합니다.

/**
 * Creates a new Order.
 * @param request the order request containing order details
 * @return the ID of the created order
 */
public Integer createOrder(OrderRequest request) {
    // check the customer --> openFeign
    var customer = this.customerClient.findCustomerById(request.customerId())
            .orElseThrow(() -> new BusinessException("Cannot create order:: No Customer exists with the provided ID"));
    // purchase the products --> product-ms (RestTemplate, RestClient)
    var purchasedProducts = this.productClient.purchaseProducts(request.products());
    // persist order
    var order = this.repository.save(mapper.toOrder(request));
    //persist order lines
    for (PurchaseRequest purchaseRequest : request.products()) {
        orderLineService.saveOrderLine(
                new OrderLineRequest(
                        null,
                        order.getId(),
                        purchaseRequest.productId(),
                        purchaseRequest.quantity()
                )
        );
    }
    // start payment process
    var paymentRequest = new PaymentRequest(
            request.amount(),
            request.paymentMethod(),
            order.getId(),
            order.getReference(),
            customer
    );
    paymentClient.requestOrderPayment(paymentRequest);

    // send the order confirmation --> notification-ms (kafka)
    orderProducer.sendOrderConfirmation(
            new OrderConfirmation(
                    request.reference(),
                    request.amount(),
                    request.paymentMethod(),
                    customer,
                    purchasedProducts
            )
    );
    return order.getId();
}

2.2 payment-service

config-server의 payment-service.yml에 카프카 설정을 합니다.

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.type.mapping: paymentConfirmation:com.ride.ecommerce.notification.PaymentNotificationRequest

2.1.1 config

KafkaPaymentTopicConfig class

package com.ride.ecommerce.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaPaymentTopicConfig {
    @Bean
    public NewTopic paymentTopic() {
        return TopicBuilder.name("payment-topic")
                .build();
    }
}

2.1.2 notification(kafka)

PaymentNotificationRequest record

package com.ride.ecommerce.notification;

import com.ride.ecommerce.payment.PaymentMethod;

import java.math.BigDecimal;

public record PaymentNotificationRequest(
        String orderReference,
        BigDecimal amount,
        PaymentMethod paymentMethod,
        String customerFirstname,
        String customerLastname,
        String customerEmail
) {
}

NotificationProducer class

package com.ride.ecommerce.notification;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import static org.springframework.kafka.support.KafkaHeaders.TOPIC;

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationProducer {

    private final KafkaTemplate<String, PaymentNotificationRequest> kafkaTemplate;

    public void sendNotification(PaymentNotificationRequest request) {
        log.info("Sending notification with body <{}>", request);
        Message<PaymentNotificationRequest> message = MessageBuilder
                .withPayload(request)
                .setHeader(TOPIC, "payment-topic")
                .build();

        kafkaTemplate.send(message);
    }
}

2.1.3 payment

PaymentService

private final NotificationProducer notificationProducer;
/**
 * Creates a new payment and sends a notification.
 *
 * @param request the payment request containing payment details
 * @return the ID of the created payment
 */
public Integer createPayment(PaymentRequest request) {
    var payment = repository.save(mapper.toPayment(request));

    notificationProducer.sendNotification(
            new PaymentNotificationRequest(
                    request.orderReference(),
                    request.amount(),
                    request.paymentMethod(),
                    request.customer().firstname(),
                    request.customer().lastname(),
                    request.customer().email()
            )
    );
    return payment.getId();
}

2.3 notification-service

config-server의 notification-service.yml에 카프카 설정을 합니다.

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: paymentGroup,orderGroup
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: '*' # com.ride.ecommerce
        spring.json.type.mapping: orderConfirmation:com.ride.ecommerce.kafka.order.OrderConfirmation,paymentConfirmation:com.ride.ecommerce.kafka.payment.PaymentConfirmation

notification은 Consumer입니다.

order와 payment에서 직렬화한 데이터를 deserializer옵션을 통해 역직렬화 합니다.

json 객체를 매핑해줄 때, 경로를 헛갈리지 않도록 합니다.

2.1.2 kafka

NotificationConsumer

각 Producer로 부터 수신한 메시지를 처리할 Consumer입니다.

메시지를 각 topic별로, 비동기 방식으로 처리합니다.

구독한 topic으로부터 메시지를 수신하면,

데이터를 db에 저장하고, 메일을 보냅니다.

package com.ride.ecommerce.kafka;

import com.ride.ecommerce.email.EmailService;
import com.ride.ecommerce.kafka.order.OrderConfirmation;
import com.ride.ecommerce.kafka.payment.PaymentConfirmation;
import com.ride.ecommerce.notification.Notification;
import com.ride.ecommerce.notification.NotificationRepository;
import jakarta.mail.MessagingException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;

import static com.ride.ecommerce.notification.NotificationType.ORDER_CONFIRMATION;
import static com.ride.ecommerce.notification.NotificationType.PAYMENT_CONFIRMATION;

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationConsumer {
    private final NotificationRepository repository;
    private final EmailService emailService;

    @KafkaListener(topics = "payment-topic")
    public void consumePaymentSuccessNotification(PaymentConfirmation paymentConfirmation) throws MessagingException {
        log.info(String.format("Consuming the message from payment-topic Topic:: %s", paymentConfirmation));
        repository.save(
                Notification.builder()
                        .type(PAYMENT_CONFIRMATION)
                        .notificationDate(LocalDateTime.now())
                        .paymentConfirmation(paymentConfirmation)
                        .build()
        );

        // todo send email
        var customerName = paymentConfirmation.customerFirstname() + " " + paymentConfirmation.customerLastname();
        emailService.sendPaymentSuccessEmail(
                paymentConfirmation.customerEmail(),
                customerName,
                paymentConfirmation.amount(),
                paymentConfirmation.orderReference()
        );
    }

    @KafkaListener(topics = "order-topic")
    public void consumePaymentSuccessNotification(OrderConfirmation orderConfirmation) throws MessagingException {
        log.info(String.format("Consuming the message from order-topic Topic:: %s", orderConfirmation));
        repository.save(
                Notification.builder()
                        .type(ORDER_CONFIRMATION)
                        .notificationDate(LocalDateTime.now())
                        .orderConfirmation(orderConfirmation)
                        .build()
        );

        // todo send email
        var customerName = orderConfirmation.customer().firstname() + " " + orderConfirmation.customer().lastname();
        emailService.sendOrderConfirmationEmail(
                orderConfirmation.customer().email(),
                customerName,
                orderConfirmation.totalAmount(),
                orderConfirmation.orderReference(),
                orderConfirmation.products()
        );
    }
}

 

이제 아래에 orer, payment 데이터의 역직렬화 시 참조될 자료형을 정의합니다.

2.1.3 kafka.order

Customer, OrderConfirmation, Product record

package com.ride.ecommerce.kafka.order;

public record Customer(
        String id,
        String firstname,
        String lastname,
        String email
) {
}
package com.ride.ecommerce.kafka.order;

import com.ride.ecommerce.kafka.payment.PaymentMethod;

import java.math.BigDecimal;
import java.util.List;

public record OrderConfirmation(
        String orderReference,
        BigDecimal totalAmount,
        PaymentMethod paymentMethod,
        Customer customer,
        List<Product> products
) {
}
package com.ride.ecommerce.kafka.order;

import java.math.BigDecimal;

public record Product(
        Integer productId,
        String name,
        String description,
        BigDecimal price,
        double quantity
) {
}

2.1.4 kafka.payment

PaymentConfirmation record, PaymentMethod enum

package com.ride.ecommerce.kafka.payment;

import java.math.BigDecimal;

public record PaymentConfirmation(
        String orderReference,
        BigDecimal amount,
        PaymentMethod paymentMethod,
        String customerFirstname,
        String customerLastname,
        String customerEmail
) {
}
package com.ride.ecommerce.kafka.payment;

public enum PaymentMethod {
    PAYPAL,
    CREDIT_CARD,
    VISA,
    MASTER_CARD,
    BITCOIN
}

 

728x90