레거시/트러블슈팅

[Logstash] Input Kafka plugin, IllegalStateException: No entry found for connection xxx

Gurumee 2021. 9. 30. 18:27
반응형

개요

다음 메트릭 데이터 수집 파이프라인 구성 중에 발생한 문제이다.

  • CollectD(x3) -> logstash -> kafka(x3)

운영 환경은 다음과 같다.

  • OS: Amazon Linux2 (x7, collectd x3, logstash, kafka x3)
  • instance type: m5.large
  • Kafka의 모든 노드는 kafka, Zookeeper가 설치되어 있으며 클러스터로 구성되어 있다.

문제 상황

Kafka에 제대로 된 데이터 수집이 안되고 있어서 Logstash의 로그를 확인해 보았다. (/var/log/logstash/input.log에서 CollectD에 데이터가 수집되는 것을 확인하였다.)

$ tail -f /var/log/logstash/logstash-plain.log
...
[2021-09-30T08:50:34,929][ERROR][org.apache.kafka.clients.producer.internals.Sender] [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread:
java.lang.IllegalStateException: No entry found for connection 2
    at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) ~[kafka-clients-2.1.0.jar:?]
    at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) ~[kafka-clients-2.1.0.jar:?]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) ~[kafka-clients-2.1.0.jar:?]
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335) ~[kafka-clients-2.1.0.jar:?]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308) ~[kafka-clients-2.1.0.jar:?]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233) [kafka-clients-2.1.0.jar:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
...

 

현재 문제가 되는 Logstash의 구성 파일은 다음과 같다.

 

/etc/logstash/conf.d/collectd-to-kafka.conf

input {
  udp {
     port => 25826
     buffer_size => 1452
     codec => collectd { }
  }
}


output {
  file {
    path => "/var/log/logstash/input.log"
    codec => rubydebug
  }
  kafka {
    bootstrap_servers => "10.0.0.10:9092,10.0.0.11:9092,10.0.0.12:9092" 
    codec => json
    topic_id => "system_metric"
  }
}

문제 해결

다행히, 같은 문제에 대해서 원인을 찾고 해결한 문서를 찾을 수 있었다. 결론만 내리자면, Logstash가 사용하는 카프카 클라이언트 라이브러리는 Kafka의 노드들을 찾을 때, 구성했던 노드 이름으로 찾게 된다. 내가 구성했던 카프카 클러스터의 정보는 다음과 같다.

 

/home/ec2-user/apps/kafka/config/server.properties

broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node1:9092
zookeeper.connect=node1:2181,node2:2181,node3/test
log.dirs=/tmp/kafka-logs-0

 

이 때, node*/etc/hosts에 private ip와 매핑되게끔 설정해두었다. 따라서 Logstash의 호스트에서도 같은 작업을 진행하면 된다. /etc/hosts에 private ip와 Kafka 노드의 이름을 알맞게 매핑시켜준다.

 

/etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost6 localhost6.localdomain6

10.0.0.11 node01
10.0.0.12 node02
10.0.0.13 node03

 

그 후 구성 파일을 다음과 같이 수정한다.

 

/etc/logstash/conf.d/collectd-to-kafka.conf

input {
  udp {
     port => 25826
     buffer_size => 1452
     codec => collectd { }
  }
}


output {
  # 이 설정은 디버깅 용이다. 나중에 삭제하는 것이 좋다.
  file {
    path => "/var/log/logstash/input.log"
    codec => rubydebug
  }
  kafka {
    bootstrap_servers => "node1:9092,node2:9092,node3:9092" # kafka 클러스터의 모든 노드
    codec => json
    topic_id => "system_metric"
  }
}

 

그 후 Logstash를 재시작한다.

$ sudo systemctl restart logstash

 

이제 다시 로그를 확인해보면 에러 문구 없이 정상적으로 CollectD에서 수집한 데이터를 Kafka로 넘기는 것을 확인할 수 있다.

$ tail -f /var/log/logstash/logstash-plain.log
....
[2021-09-30T08:54:52,674][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
[2021-09-30T08:54:52,675][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
[2021-09-30T08:54:52,728][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x2b737e75 run>"}
[2021-09-30T08:54:52,814][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-09-30T08:54:52,950][INFO ][logstash.inputs.udp      ] Starting UDP listener {:address=>"0.0.0.0:25826"}
[2021-09-30T08:54:53,045][INFO ][logstash.inputs.udp      ] UDP listener started {:address=>"0.0.0.0:25826", :receive_buffer_bytes=>"106496", :queue_size=>"2000"}
[2021-09-30T08:54:53,161][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-09-30T08:54:54,743][INFO ][logstash.outputs.file    ] Opening file {:path=>"/var/log/logstash/input.log"}
[2021-09-30T08:54:55,096][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: CSLOEmwKT72WIXFuiNNGBg

참고

728x90
반응형