2020-11-25-kafka.md

3년만에 다시 작성하는 kafka 추가 내용.

kafka version 1.1.1

프로듀서의 옵션

  • enable.idempotence 옵션을 true로 설정:
    멱등성 : 연산을 여러 번 적용하더라도 결과가 달라지지 않는 성질 [원자성 보장]
    enable.idempotence를 true로 설정하면, 카프카 프로듀서(Producer)가 메시지를 전송할 때 이중 전송 및 재전송을 방지합니다.
    즉, 메시지가 중복으로 전송되지 않도록 보장합니다. 이는 메시지 전달의 신뢰성을 높이는 데 도움이 됩니다.

3개의 브로커에 최소 복제2를 만족하면 전송 성공


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

// Properties 객체를 생성하고 설정 값 할당
Properties props = new Properties();
props.put("bootstrap.servers", "host etc .... ");
props.put("retries", 1);     // 재시도 횟수
props.put("batch.size", 65536); // 크기기반 일괄처리
props.put("linger.ms", 100);   // 시간기반 일괄처리
props.put("buffer.memory", 33554432); // 전송대기 버퍼
props.put("max.request.size", 1048576); // 요청 최대크기
props.put("min.insync.replicas", 2); // 최소 복제
props.put("enable.idempotence", true); // 데이터 중복 개선.
props.put("max.in.flight.requests.per.connection", 1); // 수신 미확인 요청의 최대 수
props.put("request.timeout.ms", 5000); // 전송후 request 타임아웃 시간 RETRY
props.put("max.block.ms", 100); // 전송후 응답시간
props.put("compression.type", "gzip");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Kafka 프로듀서를 생성
Producer<String, String> producer = new KafkaProducer<>(props);

// 메시지를 보내려면 ProducerRecord를 생성하고 send 메서드로 전송
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value");
producer.send(record);

// 프로듀서를 닫아 리소스를 정리
producer.close();

  • acks 옵션을 all로 설정:
    acks 옵션을 all로 설정하면 프로듀서가 메시지를 전송한 후 해당 메시지가 모든 인스턴스에 성공적으로 복제될 때까지 대기합니다.
    이는 메시지의 안정성을 높이는데 도움이 되며, 메시지가 분실되지 않도록 보장합니다.
    그러나 전송 지연이 발생할 수 있습니다.

  • replication.factor 옵션을 3으로 설정:
    replication.factor를 3으로 설정하면 카프카 토픽의 파티션(Partition)이 세 개의 복제본을 가지게 됩니다.
    이는 데이터의 내고장성을 보장하고, 하나 이상의 브로커(Broker)가 실패해도 데이터 손실을 방지합니다.
    따라서 데이터의 안정성을 높이는데 도움이 됩니다.

  • min.insync.replicas 옵션을 2로 설정:
    min.insync.replicas를 2로 설정하면 메시지를 전송할 때 적어도 두 개의 복제본이 동기화되어야 한다는 조건을 만족해야 합니다.
    이것은 데이터의 내고장성을 보장하기 위해 사용됩니다.
    즉, 적어도 두 개의 복제본이 성공적으로 메시지를 수신한 경우에만 프로듀서가 성공으로 간주됩니다.
    이렇게 설정 옵션을 변경함으로써, 카프카에서 데이터의 신뢰성과 내고장성을 높일 수 있습니다.
    그러나 이러한 설정 변경은 성능 및 지연에 영향을 미칠 수 있으므로 신중하게 고려해야 합니다.

전송 실패시 아래 이미지의 retry에 파일이 생기고 retryok로 이동

  • retryok 에 있는 재처리완료 파일은 3일후 자동 삭제.
    [DevOps + AI] 카프카, 대규모 클러스터 운영 후기 / if(kakao)dev2022 https://www.youtube.com/watch?v=SuHtHQkRV7g 를 보면, retry 가 아닌, For Fault 라는 별도의 카프카를 또 하나 두어 장애처리는 하는 것을 보고 더 괜찮은 생각이라 생각이 들었으나, 파일로 저장 후 후처리도 그때 당시에는 괜찮은 전략이었습니다.

장애시 처리 / 복구 절차 - 카프카

  1. kafka ISR 그룹에 브로커들이 정상적으로 존재하는지 상태 확인
    $ bin/kafka-topics.sh -describe –zookeeper localhost:2181
    http://:/clusters/kafka/topics/xxxData

  2. kafka 서버 ISR 상태에 따라 순차적인 재시작, 최소 서버 2대이상 살아있어야 안정적인 운영가능.
    현재 카프카는 디스크 full로 다운된 현상 말고는 문제가 없음.

  3. 처리 현황파악
    3-1. 각 was의 에러로그 확인
    $ curl http://:/cronwork/monitoring/kafka_error_retrycnt.txt
    3-2. 텔레그램 연결하여, 알람으로온 전송지연서버의 producer MBeans (record-send-rate) 확인
    3-4. 재처리 진행 여부 확인 : 각 서버의 error log 의 재처리 진행 상황 확인
    $ grep KafkaError /home/xxx/logs/log4j/xxx.infolog.log
    3-5. 각 was 의 컨슈머서버의 인스턴스 확인 http://:/clusters/kafka/consumers/group-billing/topic/xxxx/type/KF Consumer Offset 의 마지막 Offset 값을 확인 3-6. 컨슈머의 파티션별 offset 을 마지막 offset으로 위치 이동하고 $ bin/kafka-consumer-groups.sh –bootstrap-server : –group group2 –reset-offsets –to-latest –all-topics –execute 3-7. row단위 데이터 에서 1번에서 확인된 번호의 시간대부터 재시작한 시간의 범위를 재처리 컨슈머로 구동.


카프카는 이벤트 드리븐 아키텍쳐와 스트림 데이터 파이프라인에서 중요한 역활을 수행하며, 스트림 이벤트를 다루는 데 특화되어 있는 카프카는 실시간 데이터를 처리하는데 적합합니다.
시간 단위 이벤트 데이터를 다루기 위한 타임스탬프 순서를 보장하기 위한 파티션과 메시지 키와 같은 기능이 카프카에 포함되어이 있습니다.

메시지 전달 신뢰성

정확히 한번 (Exactly-once) : 모든 데이터 처리가 단 한번만 처리. 이벤트 데이터가 발생되었을때 단 한번만 처리됨.

적어도 한번 (At least once) : 네트워크, 브로커 장애 등에 의해서 데이터를 중복 발행, 수행, 적재 됨을 의미. 어느 누구도 은행에서 이체되었을떄 한번만 되길 원하지 2번 이상 되길 원하지 않음.

최대 한번 (At most once) : 관련 장애에 의해 유실되지 않는 것을 원함.