-
[Logstash] Input Kafka plugin, IllegalStateException: No entry found for connection xxx레거시/트러블슈팅 2021. 9. 30. 18:27반응형
개요
다음 메트릭 데이터 수집 파이프라인 구성 중에 발생한 문제이다.
- CollectD(x3) -> logstash -> kafka(x3)
운영 환경은 다음과 같다.
- OS: Amazon Linux2 (x7, collectd x3, logstash, kafka x3)
- instance type: m5.large
- Kafka의 모든 노드는 kafka, Zookeeper가 설치되어 있으며 클러스터로 구성되어 있다.
문제 상황
Kafka
에 제대로 된 데이터 수집이 안되고 있어서Logstash
의 로그를 확인해 보았다. (/var/log/logstash/input.log
에서CollectD
에 데이터가 수집되는 것을 확인하였다.)$ tail -f /var/log/logstash/logstash-plain.log ... [2021-09-30T08:50:34,929][ERROR][org.apache.kafka.clients.producer.internals.Sender] [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for connection 2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233) [kafka-clients-2.1.0.jar:?] at java.lang.Thread.run(Thread.java:829) [?:?] ...
현재 문제가 되는
Logstash
의 구성 파일은 다음과 같다./etc/logstash/conf.d/collectd-to-kafka.conf
input { udp { port => 25826 buffer_size => 1452 codec => collectd { } } } output { file { path => "/var/log/logstash/input.log" codec => rubydebug } kafka { bootstrap_servers => "10.0.0.10:9092,10.0.0.11:9092,10.0.0.12:9092" codec => json topic_id => "system_metric" } }
문제 해결
다행히, 같은 문제에 대해서 원인을 찾고 해결한 문서를 찾을 수 있었다. 결론만 내리자면,
Logstash
가 사용하는 카프카 클라이언트 라이브러리는Kafka
의 노드들을 찾을 때, 구성했던 노드 이름으로 찾게 된다. 내가 구성했던 카프카 클러스터의 정보는 다음과 같다./home/ec2-user/apps/kafka/config/server.properties
broker.id=0 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://node1:9092 zookeeper.connect=node1:2181,node2:2181,node3/test log.dirs=/tmp/kafka-logs-0
이 때,
node*
은/etc/hosts
에 private ip와 매핑되게끔 설정해두었다. 따라서Logstash
의 호스트에서도 같은 작업을 진행하면 된다./etc/hosts
에 private ip와Kafka
노드의 이름을 알맞게 매핑시켜준다./etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost6 localhost6.localdomain6 10.0.0.11 node01 10.0.0.12 node02 10.0.0.13 node03
그 후 구성 파일을 다음과 같이 수정한다.
/etc/logstash/conf.d/collectd-to-kafka.conf
input { udp { port => 25826 buffer_size => 1452 codec => collectd { } } } output { # 이 설정은 디버깅 용이다. 나중에 삭제하는 것이 좋다. file { path => "/var/log/logstash/input.log" codec => rubydebug } kafka { bootstrap_servers => "node1:9092,node2:9092,node3:9092" # kafka 클러스터의 모든 노드 codec => json topic_id => "system_metric" } }
그 후
Logstash
를 재시작한다.$ sudo systemctl restart logstash
이제 다시 로그를 확인해보면 에러 문구 없이 정상적으로
CollectD
에서 수집한 데이터를Kafka
로 넘기는 것을 확인할 수 있다.$ tail -f /var/log/logstash/logstash-plain.log .... [2021-09-30T08:54:52,674][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0 [2021-09-30T08:54:52,675][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f [2021-09-30T08:54:52,728][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x2b737e75 run>"} [2021-09-30T08:54:52,814][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2021-09-30T08:54:52,950][INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:25826"} [2021-09-30T08:54:53,045][INFO ][logstash.inputs.udp ] UDP listener started {:address=>"0.0.0.0:25826", :receive_buffer_bytes=>"106496", :queue_size=>"2000"} [2021-09-30T08:54:53,161][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} [2021-09-30T08:54:54,743][INFO ][logstash.outputs.file ] Opening file {:path=>"/var/log/logstash/input.log"} [2021-09-30T08:54:55,096][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: CSLOEmwKT72WIXFuiNNGBg
참고
728x90'레거시 > 트러블슈팅' 카테고리의 다른 글