8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

连接到在 Docker 中运行的 Kafka

Qualcuno2 1月前

38 0

我在本地机器上设置了一个单节点 Kafka Docker 容器,就像 Confluent 文档中描述的那样(步骤 2-3)。此外,我还公开了 Zookeeper 的端口 2181 和 Kafka 的端口...

我在本地机器上设置了一个单节点 Kafka Docker 容器,就像 Confluent 文档 (步骤 2-3)。

此外,我还公开了 Zookeeper 的端口 2181 和 Kafka 的端口 9092,以便我能够从本地机器上运行的客户端连接到它们:

$ docker run -d \
    -p 2181:2181 \
    --net=confluent \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    confluentinc/cp-zookeeper:4.1.0

$ docker run -d \
    --net=confluent \
    --name=kafka \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:4.1.0

问题: 当我尝试从主机连接到 Kafka 时,连接失败,因为它 can't resolve address: kafka:9092 .

下面是我的 Java 代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();

例外情况:

java.io.IOException: Can't resolve address: kafka:9092
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-2.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-2.0.0.jar:na]
    ... 7 common frames omitted

问题: 如何连接到在 Docker 中运行的 Kafka?我的代码是从主机运行的,而不是 Docker。

注意:我知道理论上我可以尝试使用 DNS 设置, /etc/hosts 但这是一种解决方法 - 它不应该是那样的。

这里 也有类似的问题 ,但它是基于 ches/kafka 图像的。我使用 confluentinc 基于图像的,这是不一样的。

帖子版权声明 1、本帖标题:连接到在 Docker 中运行的 Kafka
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Qualcuno2在本站《apache-spark》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 在 zookeeper 之前

    1. docker 容器运行 --name zookeeper -p 2181:2181 zookeeper

    卡夫卡之后

    1. docker 容器运行--name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.8.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ip_address_of_your_computer_but_not_localhost!!!:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

    在 kafka 消费者和生产者配置中

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    

    我按照这些规定来运行我的项目。祝你好运。

返回
作者最近主题: