Kafka使用Java客户端进行访问的示例代码

发布时间 - 2026-01-11 03:09:00    点击率:

本文环境如下:

操作系统:CentOS 6 32位

JDK版本:1.8.0_77 32位

Kafka版本:0.9.0.1(Scala 2.11)

1. maven依赖包

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.9.0.1</version>
</dependency>

2. 生产者代码

package com.lnho.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "master:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for(int i = 0; i < 100; i++)
      producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));

    producer.close();
  }
}

3. 消费者代码

package com.lnho.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "master:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic1"));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    }
  }
}


4. 执行程序

lib底下需要有:kafka-clients-0.9.0.1.jar log4j-1.2.17.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar

生产者:

复制代码 代码如下:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaProducerExample

消费者:

复制代码 代码如下:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaConsumerExample

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


# Kafka  # JAVA客户端  # Java客户端访问  # Kafka简单客户端编程实例  # Java Kafka分区发送及消费实战  # 关于Kafka消费者订阅方式  # Springboot集成Kafka进行批量消费及踩坑点  # Kafka消费客户端协调器GroupCoordinator详解  # 大家多多  # 操作系统  # Properties  # ProducerRecord  # util  # public  # void  # static  # KafkaProducerExample  # lnho  # package  # java  # import  # Producer  # KafkaProducer 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: *服务器网站为何频现安全漏洞?  Laravel如何实现事件和监听器?(Event & Listener实战)  如何在Windows虚拟主机上快速搭建网站?  Laravel怎么配置不同环境的数据库_Laravel本地测试与生产环境动态切换【方法】  想要更高端的建设网站,这些原则一定要坚持!  悟空识字怎么关闭自动续费_悟空识字取消会员自动扣费步骤  创业网站制作流程,创业网站可靠吗?  魔毅自助建站系统:模板定制与SEO优化一键生成指南  Laravel如何实现用户角色和权限系统_Laravel角色权限管理机制  如何制作公司的网站链接,公司想做一个网站,一般需要花多少钱?  Laravel怎么使用artisan命令缓存配置和视图  Laravel中间件如何使用_Laravel自定义中间件实现权限控制  Laravel怎么上传文件_Laravel图片上传及存储配置  百度浏览器如何管理插件 百度浏览器插件管理方法  如何制作新型网站程序文件,新型止水鱼鳞网要拆除吗?  手机钓鱼网站怎么制作视频,怎样拦截钓鱼网站。怎么办?  Linux后台任务运行方法_nohup与&使用技巧【技巧】  如何在IIS7中新建站点?详细步骤解析  laravel服务容器和依赖注入怎么理解_laravel服务容器与依赖注入解析  免费的流程图制作网站有哪些,2025年教师初级职称申报网上流程?  JavaScript常见的五种数组去重的方式  Laravel请求验证怎么写_Laravel Validator自定义表单验证规则教程  Laravel Livewire是什么_使用Laravel Livewire构建动态前端界面  laravel怎么实现图片的压缩和裁剪_laravel图片压缩与裁剪方法  哪家制作企业网站好,开办像阿里巴巴那样的网络公司和网站要怎么做?  浅谈redis在项目中的应用  C语言设计一个闪闪的圣诞树  如何用低价快速搭建高质量网站?  在线制作视频网站免费,都有哪些好的动漫网站?  Laravel怎么为数据库表字段添加索引以优化查询  个人网站制作流程图片大全,个人网站如何注销?  如何在万网主机上快速搭建网站?  Swift中switch语句区间和元组模式匹配  专业商城网站制作公司有哪些,pi商城官网是哪个?  如何正确选择百度移动适配建站域名?  Windows家庭版如何开启组策略(gpedit.msc)?(安装方法)  如何快速查询域名建站关键信息?  Laravel如何升级到最新版本?(升级指南和步骤)  HTML透明颜色代码怎么让图片透明_给img元素加透明色的技巧【方法】  Java类加载基本过程详细介绍  Python并发异常传播_错误处理解析【教程】  如何在宝塔面板中修改默认建站目录?  如何快速搭建高效香港服务器网站?  如何快速登录WAP自助建站平台?  车管所网站制作流程,交警当场开简易程序处罚决定书,在交警网站查询不到怎么办?  如何快速选择适合个人网站的云服务器配置?  Linux系统命令中tree命令详解  如何彻底删除建站之星生成的Banner?  php读取心率传感器数据怎么弄_php获取max30100的心率值【指南】  如何制作一个表白网站视频,关于勇敢表白的小标题?