8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

连接到在 Docker 中运行的 Kafka

Mukesh Sharma 2月前

141 0

我在本地机器上设置了一个单节点 Kafka Docker 容器,就像 Confluent 文档中描述的那样(步骤 2-3)。此外,我还公开了 Zookeeper 的端口 2181 和 Kafka 的端口...

我在本地机器上设置了一个单节点 Kafka Docker 容器,就像 Confluent 文档 (步骤 2-3)。

此外,我还公开了 Zookeeper 的端口 2181 和 Kafka 的端口 9092,以便我能够从本地机器上运行的客户端连接到它们:

$ docker run -d \
    -p 2181:2181 \
    --net=confluent \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    confluentinc/cp-zookeeper:4.1.0

$ docker run -d \
    --net=confluent \
    --name=kafka \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:4.1.0

问题: 当我尝试从主机连接到 Kafka 时,连接失败,因为它 can't resolve address: kafka:9092 .

下面是我的 Java 代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();

例外情况:

java.io.IOException: Can't resolve address: kafka:9092
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-2.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-2.0.0.jar:na]
    ... 7 common frames omitted

问题: 如何连接到在 Docker 中运行的 Kafka?我的代码是从主机运行的,而不是 Docker。

注意:我知道理论上我可以尝试使用 DNS 设置, /etc/hosts 但这是一种解决方法 - 它不应该是那样的。

这里 也有类似的问题 ,但它是基于 ches/kafka 图像的。我使用 confluentinc 基于图像的,这是不一样的。

帖子版权声明 1、本帖标题:连接到在 Docker 中运行的 Kafka
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Mukesh Sharma在本站《apache-spark》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 很确定这只适用于所有使用此类网络设置的 docker 容器。您本质上是在这里创建一个单独的网络(confluent),并且两个容器(zookeeper 和 kafka)可以相互通信,但您无法直接通过 localhost 从外部访问它。我认为如果您使用 /etc/hosts 它会起作用,但我不确定。但这不是一种解决方法,因为容器不在 localhost 上运行。它们在 confluent 网络上运行。如果指定 ip 地址而不是 localhost,它会起作用吗?

  • tl;dr - 从容器到主机的简单端口转发将 不起作用 不应修改 /etc/hosts 主机文件(例如

    在代理上将 advertised.listeners 值设置为 advertised.host.name (而不是 advertised.port Connection to node -1 (localhost/127.0.0.1:9092) ,则表示您的应用容器尝试连接到自身。您的应用容器是否也在运行 Kafka 代理进程?可能不是。

    2) 确保列出的服务器 bootstrap.servers 确实可解析。例如 ping IP/主机名,用于 netcat 检查端口... 如果您的客户端位于容器中,则需要 从容器 如果容器没有立即崩溃, docker exec 请使用它

    3) 如果从主机而不是另一个容器运行进程,则要验证端口是否在主机上正确映射,请确保显示 docker ps kafka 容器是从 映射的 0.0.0.0:<host_port> -> <advertised_listener_port>/tcp 。如果尝试从 Docker 网络外部运行客户端,端口必须匹配。您不需要在两个容器之间进行端口转发;使用链接/docker 网络


    以下答案使用 confluentinc docker 镜像来解决所提出的问题, 而不是 wurstmeister/kafka 。如果您 KAFKA_ADVERTISED_HOST_NAME 设置了变量,请将其删除(这是一个弃用的属性)

    以下部分尝试汇总使用其他镜像所需的所有详细信息。对于其他常用的 Kafka 镜像, it's all the same Apache Kafka 在容器中运行的 Apache Kafka。
    你只需要看 它是如何配置的 ,以及 哪些变量 使它如此。

    wurstmeister/kafka

    自 2023 年 10 月起,DockerHub 中不再存在此功能。无论如何,2022 年之后不再维护。

    请参阅有关 侦听器配置 ,另 请参阅其连接性 wiki .

    bitnami/kafka

    如果您想要一个小容器,请尝试这些。这些图像比 Confluent 图像小得多,并且比 wurstmeister . 参考其 README 进行监听器配置维护得更好。

    debezium/kafka

    此处提到了 相关文档 .

    注意 :已弃用广告主机和端口设置。广告 侦听器 涵盖两者。与 Confluent 容器类似,Debezium 可以使用 KAFKA_ 前缀代理设置来更新其属性。

    其他的

    • ubuntu/kafka 需要您通过 Docker 镜像参数添加 --override advertised.listeners=kafka:9092 ...我发现它比环境变量的可移植性更差,所以不推荐
    • spotify/kafka 已被弃用并且过时了。
    • fast-data-dev 或者 lensesio/box 非常适合一体化解决方案,包括 Schema Registry、Kafka Connect 等,但如果你想要 Kafka,它们就太臃肿了。此外,在一个容器中运行许多服务是 Docker 的反模式
    • 你自己 Dockerfile - 为什么?这些其他人的东西不完整吗?从拉取请求开始,而不是从头开始。

    有关补充阅读材料、 功能齐全的 docker-compose 和网络图,请参阅 @rmoff 的博客

    回答

    Confluent 快速入门(Docker)文档 假定所有生产和消费请求都在 Docker 网络内。

    您可以通过在其自己的容器内运行 Kafka 客户端代码来解决连接问题 kafka:9092 ,因为它使用 Docker 网络桥,但除此之外,您需要添加更多环境变量以在外部公开容器,同时仍使其在 Docker 网络中工作。

    首先添加一个协议映射 PLAINTEXT_HOST:PLAINTEXT ,将侦听器协议映射到 Kafka 协议

    钥匙: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
    价值: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

    然后在不同的端口上设置两个通告的监听器。( kafka 这里指的是docker容器名称;它也可能被命名 broker ,因此请仔细检查您的服务+主机名)。

    钥匙: KAFKA_ADVERTISED_LISTENERS
    价值: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092

    请注意,此处的协议与上面协议映射设置左侧的值相匹配

    在运行容器时,添加 -p 29092:29092 主机端口映射,以及通告 PLAINTEXT_HOST 监听器。


    因此...( 使用上述设置 )

    如果 仍然 不起作用, KAFKA_LISTENERS 可以设置为包括 <PROTOCOL>://0.0.0.0:<PORT> 两个选项与广告设置和 Docker 转发端口匹配的位置

    客户端位于同一台机器上,不在容器中

    正如您所期望的那样,宣传 localhost 和相关端口将允许您连接到容器外部。

    换句话说,在 Docker 网络 之外 localhost:29092 用于引导服务器和 localhost:2181 Zookeeper(需要 Docker 端口转发)

    另一台机器上的客户端

    如果尝试从外部服务器连接,则需要公布 192.168.x.y 主机的外部主机名/ip(例如) 以及/代替本地主机 .
    仅通过端口转发来宣传本地主机是行不通的,因为 Kafka 协议仍将继续宣传您已配置的侦听器。

    此设置需要 Docker 端口转发路由器端口转发(以及防火墙/安全组更改),例如,您的容器在云中运行并且您想从本地机器与其交互。

    同一主机上容器中的客户端(或另一个代理)

    这是最不容易出错的配置;您可以直接使用 DNS 服务名称。

    在 Docker 网络中 运行应用程序时 ,使用 kafka:9092 (参见 PLAINTEXT 上面公布的侦听器配置)作为引导服务器和 zookeeper:2181 Zookeeper,就像任何其他 Docker 服务通信一样(不需要任何端口转发)


    如果你使用单独的 docker run 命令或 Compose 文件,则需要 network 使用 Compose networks 部分手动定义共享或 docker network --create


    请参阅完整 Confluent 堆栈的示例 Compose 文件 or 单个代理的 更简约文件

    如果使用多个代理,则它们需要使用唯一的主机名 + 公布的侦听器。 参见示例

    相关问题

    从 Docker(ksqlDB)连接到主机上的 Kafka

    附录

    对于任何对 Kubernetes 部署感兴趣的人:

  • CG33 2月前 0 只看Ta
    引用 4

    但是如果我只想使用 9092 怎么办?我的意思是外面的 9092,而不是 29092

  • Kyll 2月前 0 只看Ta
    引用 5

    @Maria 然后用 -p 9092:9092 更改端口映射 PLAINTEXT_HOST://localhost:9092。您可能仍希望在 Docker 网络内为容器设置单独的侦听器

  • 你的意思是在这种情况下我只需要 \'-p 9092:9092\' 而不需要 \'-p 9092:9092\' ?

  • @Maria 如果主机上有客户端,则需要公开某个端口。该端口取决于所宣传的侦听器的设置方式。

  • @Maria 我想我现在已经回答了 3 次了?是的,你可以,但前提是你必须更改为 PLAINTEXT_HOST://localhost:9092,与我的答案相比

  • 当你第一次连接到 kafka 节点时,它会返回所有 kafka 节点和要连接的 url。然后你的应用程序将尝试直接连接到每个 kafka。

    问题始终是 kafka 会给你什么 url ?这就是为什么会有 , KAFKA_ADVERTISED_LISTENERS kafka 将使用它来告诉世界如何访问它。

    现在对于您的用例,有多个小事情需要考虑:

    假设你设置 plaintext://kafka:9092

    • 可通过 docker 网络解析的 kafka URL
    • 如果您尝试从主系统或不在同一 docker 网络的另一个容器进行连接,则会失败,因为 kafka 无法解析名称。

    ==> 要解决这个问题,你需要有一个特定的 DNS 服务器,比如服务发现服务器,但这对于小东西来说是个大麻烦。或者你手动将名称设置 kafka 为每个容器的 ip /etc/hosts

    如果你设置 plaintext://localhost:9092

    • 如果你的系统有端口映射的话(启动 kafka 时使用 -p 9092:9092),那么这一切都没问题。
    • 如果您从容器上的应用程序进行测试(无论是否使用相同的 docker 网络),则此操作将会失败(localhost 是容器本身,而不是 kafka)

    ==> 如果你有这个并且希望在另一个容器中使用 kafka 客户端,解决这个问题的一种方法是共享两个容器的网络(相同的 ip)

    最后一个选项:在名称中设置一个 IP: plaintext://x.y.z.a:9092 (kafka 公布的 URL 不能是 0.0.0.0,如文档 https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners )

    这对于每个人来说都没问题...但是你如何获得 xyza 名称?

    唯一的方法是在启动容器时对此 ip 进行硬编码: docker run .... --net confluent --ip 10.x.y.z ... 。请注意,您需要将 ip 适配为 confluent 子网中的一个有效 ip。

  • 谢谢 (+1)。第二种选择对我来说很有效,我可能会坚持使用。如果没有更好的解决方案,我稍后会接受答案(正如您已经提到的,我们失去了在容器内建立连接的能力)。

  • 在 zookeeper 之前

    1. docker 容器运行 --name zookeeper -p 2181:2181 zookeeper

    卡夫卡之后

    1. docker 容器运行--name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.8.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ip_address_of_your_computer_but_not_localhost!!!:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

    在 kafka 消费者和生产者配置中

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    

    我按照这些规定来运行我的项目。祝你好运。

  • ip_address_of_your_computer_but_not_localhost ... Localhost 工作正常,如果你参考我的回答... Compose 会比 docker run 更好

  • 不应该是 localhost。因为你必须将容器视为外部系统。这就是为什么你应该将其指向计算机的 IP 地址,而不是其 localhost。

  • 您可以从容器进行端口转发,然后从 localhost 上的主机进行访问。您是否尝试过我的答案中列出的设置?或者阅读 rmoff.net/2018/08/02/kafka-listeners-explained ?

  • 解决这个问题最简单的方法是使用 -h 选项向代理添加自定义主机名

    docker run -d \
        --net=confluent \
        --name=kafka \
        -h broker-1 \
        -p 9092:9092 \
        -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
        -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
        -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
        confluentinc/cp-kafka:4.1.0
    

    并编辑你的 /etc/hosts

    127.0.0.1   broker-1
    

    并使用:

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-1:9092");
    
  • 引用 16

    不。请不要编辑 /etc/hosts。使用此代码,您仍然会收到 UnknownHost: kafka... 的错误,只需在此处使用 127.0.0.1:9092,并将 KAFKA_ADVERTISED_LISTENERS 相应地设置为 PLAINTEXT://localhost:9092

  • 这使我能够访问 localhost:9092 M1 Mac 上的 Kafka 应用程序

    Key: KAFKA_ADVERTISED_LISTENERS
    Value: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    

    加上端口转发:

    ports
       - "9092:9092"
    

    最后,再次,对于我的设置,我必须以这种方式设置听众键

    Key: KAFKA_LISTENERS
    Value: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
    
  • 这实际上并没有为可接受的答案添加任何内容。\'M1 Mac\' 并不重要,因为所有 Docker 主机的行为都相同。此外,此处正在使用 \'来自 docker\' 端口 9092...

  •       KAFKA_BROKER_ID: 1
          KAFKA_ADVERTISED_HOST_NAME: kafka:9092
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    

    此配置工作正常

    确保从 docker 内部进行连接 kafka:29092

    从容器外部 localhost:9092

    完整有效的 docker compose 配置

        version: "3.3"
    
        services:
          zookeeper:
            image: confluentinc/cp-zookeeper:6.2.0
            container_name: zookeeper
            networks:
              - broker-kafka
            ports:
              - "2181:2181"
            environment:
              ZOOKEEPER_CLIENT_PORT: 2181
              ZOOKEEPER_TICK_TIME: 2000
              ALLOW_ANONYMOUS_LOGIN: yes
            volumes:
              - ./bitnami/zookeeper:/bitnami/zookeeper
        
          kafka:
            image: confluentinc/cp-kafka:6.2.0
            container_name: kafka
            networks:
              - broker-kafka
            depends_on:
              - zookeeper
            ports:
              - "9092:9092"
            expose:
              - "9092"
            environment:
              KAFKA_BROKER_ID: 1
              KAFKA_ADVERTISED_HOST_NAME: kafka:9092
              KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
              KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
              KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
              KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
              KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
              # KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
              # KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
              # KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
              KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
              # KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
              KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
            volumes:
              - ./bitnami/kafka:/bitnami/kafka
        
          kafdrop:
            image: obsidiandynamics/kafdrop
            container_name: kafdrop
            ports:
              - "9000:9000"
            expose:
              - "9000"
            networks:
              - broker-kafka
            environment:
              KAFKA_BROKERCONNECT: "PLAINTEXT://kafka:29092"
              JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
              SPRING_PROFILES_ACTIVE: "dev"
            depends_on:
              - kafka
              - zookeeper
        
          consumer:
            container_name: consumer
            build:
              context: ./consumer
              dockerfile: Dockerfile
            environment:
              - KAFKA_TOPIC_NAME=app
              - KAFKA_SERVER=kafka
              - KAFKA_PORT=29092
            ports:
              - 8001:8001
            restart: "always"
            depends_on:
              - zookeeper
              - kafka
              - publisher
              - kafdrop
            networks:
              - broker-kafka
        
          publisher:
            container_name: publisher
            build:
              context: ./producer
              dockerfile: Dockerfile
            environment:
              - KAFKA_TOPIC_NAME=app
              - KAFKA_SERVER=kafka
              - KAFKA_PORT=29092
            ports:
              - 8000:8000
            restart: "always"
            depends_on:
              - zookeeper
              - kafka
              - kafdrop
            networks:
              - broker-kafka
            volumes:
              - ./testproducer:/producer
        
        networks:
          broker-kafka:
            driver: bridge
    
  • 顺便说一句,您的容器数据并未在此处保存。另外,您是否已通过从其他机器进行连接测试过此操作?

返回
作者最近主题: