본문 바로가기
마루아라는 개발쟁이/JAVA

[Java 개발] Kafka 활용 가이드: Producer부터 Consumer까지 실습 코드로 배우기

by 마루아라 이야기 2025. 3. 19.
반응형

안녕하세요, 개발자 여러분! 💻 오늘은 Java 개발에서 꼭 알아두면 좋은 메시지 큐 시스템인 Kafka에 대해 알아보려고 해요. 분산 시스템과 대용량 데이터 처리가 중요해지면서 Kafka의 활용도가 매우 높아졌는데요, 이번 글에서는 Java 개발자가 Kafka를 어떻게 활용할 수 있는지 Producer부터 Consumer까지 상세한 코드 예제와 함께 살펴보겠습니다. 특히 실무에서 자주 사용되는 Spring Boot와의 연동 방법까지 다루니 끝까지 함께해 주세요! 🚀

Kafka란 무엇인가요?

Kafka는 LinkedIn에서 개발된 분산 스트리밍 플랫폼으로, 대용량의 데이터를 실시간으로 처리할 수 있는 메시징 시스템입니다. 전통적인 메시지 큐와 달리 Kafka는 높은 처리량, 내구성, 확장성을 제공하며, 마이크로서비스 아키텍처에서 중요한 역할을 합니다.

Kafka의 핵심 구성 요소로는 Producer(데이터 생산자), Consumer(데이터 소비자), Broker(메시지 저장 서버), Topic(메시지 카테고리) 등이 있어요. Java 애플리케이션에서는 Kafka 클라이언트 라이브러리를 사용하여 쉽게 Producer와 Consumer를 구현할 수 있습니다.

특히 이벤트 기반 아키텍처(Event-Driven Architecture)가 인기를 끌면서 Kafka의 중요성은 더욱 커지고 있어요. 실시간 데이터 분석, 로그 수집, 시스템 간 데이터 동기화 등 다양한 사용 사례에서 활용되고 있죠.

Java에서 Kafka 환경 설정하기

Java 애플리케이션에서 Kafka를 사용하기 위해서는 먼저 필요한 의존성과 환경 설정이 필요합니다. Maven이나 Gradle을 사용하여 Kafka 클라이언트 라이브러리를 추가해 주세요.

Maven 의존성 추가:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

Gradle 의존성 추가:

implementation 'org.apache.kafka:kafka-clients:3.5.0'

로컬 개발 환경에서 테스트하려면 Docker를 활용하는 것이 편리해요. 아래 docker-compose.yml 파일로 Kafka와 Zookeeper를 쉽게 설정할 수 있습니다.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

Kafka Producer 구현하기

Kafka Producer는 메시지를 생성하여 Kafka 토픽으로 전송하는 역할을 합니다. Java에서 Producer를 구현하는 방법을 살펴볼게요.

기본 Producer 구현 예제:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;

public class MyKafkaProducer {
    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:29092";

    public static void main(String[] args) {
        // Producer 설정
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // 추가 설정 (선택사항)
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        // Producer 인스턴스 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        
        // 콘솔에서 메시지 입력받아 전송
        Scanner scanner = new Scanner(System.in);
        System.out.println("메시지를 입력하세요 (종료: exit):");
        
        String message;
        while (!(message = scanner.nextLine()).equals("exit")) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("메시지 전송 성공: " + metadata.topic() + ", 파티션: " + metadata.partition());
                } else {
                    System.err.println("메시지 전송 실패: " + exception.getMessage());
                }
            });
            System.out.print("메시지를 입력하세요 (종료: exit): ");
        }
        
        producer.flush();
        producer.close();
        scanner.close();
    }
}

위 코드에서 주목할 점은 Producer 설정을 위한 Properties 객체와 비동기 콜백을 사용한 메시지 전송 방식입니다. 실제 프로젝션에서는 메시지 전송 결과를 처리하는 방식을 프로젝트 요구사항에 맞게 조정해야 해요.

Kafka Consumer 구현하기

Kafka Consumer는 토픽에서 메시지를 읽어오는 역할을 합니다. Consumer 그룹을 통해 여러 Consumer가 협력하여 메시지를 소비할 수 있어요.

기본 Consumer 구현 예제:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyKafkaConsumer {
    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:29092";
    private static final String GROUP_ID = "my-group";

    public static void main(String[] args) {
        // Consumer 설정
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Consumer 인스턴스 생성 및 토픽 구독
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        
        System.out.println(TOPIC_NAME + " 토픽 메시지 대기 중...");
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("수신된 메시지: " + record.value() + 
                                       ", 토픽: " + record.topic() + 
                                       ", 파티션: " + record.partition() + 
                                       ", 오프셋: " + record.offset());
                }
            }
        } catch (Exception e) {
            System.err.println("Consumer 예외 발생: " + e.getMessage());
        } finally {
            consumer.close();
        }
    }
}

Consumer 설정에서 AUTO_OFFSET_RESET_CONFIG를 "earliest"로 설정하면 토픽의 처음부터 메시지를 읽어옵니다. "latest"로 설정하면 가장 최근 메시지부터 읽어오게 됩니다. 상황에 맞게 설정해 주세요.

Spring Boot와 Kafka 연동하기

Spring Boot 애플리케이션에서는 Spring Kafka 라이브러리를 사용하면 더 쉽게 Kafka를 연동할 수 있어요. 어노테이션 기반으로 Producer와 Consumer를 구현할 수 있습니다.

의존성 추가 (build.gradle):

implementation 'org.springframework.kafka:spring-kafka'

application.yml 설정:

spring:
  kafka:
    bootstrap-servers: localhost:29092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Producer 서비스 구현:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final String TOPIC_NAME = "my-topic";
    
    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC_NAME, message)
            .addCallback(
                result -> System.out.println("메시지 전송 성공: " + message),
                ex -> System.err.println("메시지 전송 실패: " + ex.getMessage())
            );
    }
}

Consumer 서비스 구현:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {
    
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consumeMessage(String message) {
        System.out.println("수신된 메시지: " + message);
        // 비즈니스 로직 처리
    }
}

실전 Kafka 활용 팁과 주의사항

1. 적절한 파티션 수 설정하기

Kafka 토픽의 파티션 수는 병렬 처리 수준을 결정합니다. 처리량이 많은 시스템에서는 파티션 수를 늘려 병렬 처리를 높이는 것이 좋지만, 파티션이 많아지면 관리 부담도 증가해요. 일반적으로 Consumer 인스턴스 수와 동일하거나 약간 많게 설정하는 것이 좋습니다.

2. Consumer 그룹 관리

같은 그룹 ID를 가진 Consumer들은 토픽의 파티션을 나누어 소비합니다. 장애 발생 시 다른 Consumer가 자동으로 파티션을 인계받아 처리하므로 고가용성을 확보할 수 있어요.

3. Producer 배치 처리

메시지 처리량이 많은 경우, Producer의 배치 설정을 조정하여 성능을 향상시킬 수 있습니다.

properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
          properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);

4. 에러 핸들링

메시지 처리 실패 시 재시도 로직이나 데드 레터 큐(Dead Letter Queue)를 구현하여 메시지 손실을 방지하세요. Spring Kafka에서는 @KafkaListener 어노테이션에 에러 핸들러를 지정할 수 있습니다.

Kafka로 확장 가능한 시스템 구축하기

Java와 Kafka를 조합하면 확장성이 뛰어난 분산 시스템을 구축할 수 있습니다. 이번 글에서는 Java 개발자가 Kafka를 시작하는 데 필요한 기본 지식과 코드 예제를 살펴보았습니다. Producer와 Consumer의 구현 방법, Spring Boot와의 연동, 그리고 실전 활용 팁까지 다루었는데요.

실제 프로젝트에 적용할 때는 메시지 직렬화/역직렬화, 보안 설정, 모니터링 등 추가 고려사항이 있습니다. 이러한 고급 주제는 다음 글에서 더 자세히 다루도록 하겠습니다.

여러분도 지금 바로 Java와 Kafka를 활용한 프로젝트를 시작해 보세요! 질문이나 의견이 있으시면 언제든지 댓글로 남겨주세요. 함께 성장하는 개발자 커뮤니티를 만들어 가요! 📚✨

반응형