《Kafka安装配置及使用说明.pdf》由会员分享,可在线阅读,更多相关《Kafka安装配置及使用说明.pdf(56页珍藏版)》请在taowenge.com淘文阁网|工程机械CAD图纸|机械工程制图|CAD装配图下载|SolidWorks_CaTia_CAD_UG_PROE_设计图分享下载上搜索。
1、KafkaKafka 安装配置及使用说明安装配置及使用说明铁树 2018-08-08Windows 平台,5 个分布式节点,修改消息大小,调用程序范例1 1 安装配置安装配置采 用5台 服 务 器 作 为 集 群 节 点,IP地 址 为:XX.XX.0.12-XX.XX.0.16.每台机器依次安装配置 JDK、zookeeper、kafka,先安装完一台机器,然后拷贝到其他机器,再修改配置文件。1.11.1 JDKJDK 安装配置安装配置JDK 版本:jdk1.7.0_51_x64 解压版jdk1.7.0_51_x64.rar解压到 C 盘 kafka 目录下,如下图。1设置环境变量:JAVA
2、_HOME:C:kafkajdk1.7.0_51_x64PATH:C:kafkajdk1.7.0_51_x64bin1.21.2 zookeeperzookeeper 安装配置安装配置1.2.11.2.1 解压安装解压安装zookeeper 版本:3.4.12 zookeeper-3.4.12.tar.gz2解压到 C 盘 kafka 目录下,如下图。1.2.21.2.2 创建创建 zookeeperzookeeper 数据目录和日志目录数据目录和日志目录zkdata#存放快照C:kafkazookeeper-3.4.12zkdatazkdatalog#存放日志C:kafkazookeeper
3、-3.4.12zkdatalog31.2.31.2.3 修改配置文件进入到“C:kafkazookeeper-3.4.12”目录下的 conf 目录中,复制 zoo_sample.cfg官方提供的 zookeeper 的样板文件,重命名为zoo.cfg官方指定的文件命名规则。默认内容:4修改后配置文件为:#The number of milliseconds of each tick5tickTime=2000#The number of ticks that the initial#synchronization phase can takeinitLimit=10#The number o
4、f ticks that can pass between#sending a request and getting an acknowledgementsyncLimit=5#the directory where the snapshot is stored.#do not use/tmp for storage,/tmp here is just#example sakes.dataDir=C:/kafka/zookeeper-3.4.12/zkdatadataLogDir=C:/kafka/zookeeper-3.4.12/zkdatalog#the port at which th
5、e clients will connectclientPort=12181server.1=XX.XX.0.12:12888:13888server.2=XX.XX.0.13:12888:13888server.3=XX.XX.0.14:12888:13888server.4=XX.XX.0.15:12888:13888server.5=XX.XX.0.16:12888:13888#the maximum number of client connections.#increase this if you need to handle more clients6#maxClientCnxns
6、=60#Be sure to read the maintenance section of the#administrator guide before turning on autopurge.#:/zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance#The number of snapshots to retain in dataDirautopurge.snapRetainCount=100#Purge task interval in hours#Set to 0 to disable auto pu
7、rge featureautopurge.purgeInterval=24配置文件解释:#tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。#initLimit:这个配置项是用来配置 Zookeeper 接受客户端这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器7集群中连接到 Leader 的 Follower 服务器 初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10个心跳的时间也就是tickTime 长度后 Zookeeper
8、 服务器还没有收到客户端的返回信息,那么说明这个客户端连接失败。总的时间长度就是 10*2000=20 秒#syncLimit:这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒#dataDir:快照日志的存储路径#dataLogDir:事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多#clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口
9、,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点通过配置 autopurge.snapRetainCount 和autopurge.purgeInterval 这两个参数能够实现定时清理了。这两个参数都是在zoo.cfg中配置的:autopurge.purgeInterval这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。8autopurge.snapRetainCountautopurge.snapRetainCount这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。1.2.41.
10、2.4 创建创建 myidmyid 文件文件在“C:kafkazookeeper-3.4.12zkdata”目录下,创建 myid文件无后缀名,内容为对应 IP 地址的主机号。如 server.1 则内容为 1。1.31.3 KafkaKafka 安装配置安装配置1.3.11.3.1 解压安装解压安装kafka 版本:kafka1.1.1kafka_2.11-1.1.1.tgz解压到 C 盘 kafka 目录下,如下图。91.3.21.3.2 创建消息目录创建消息目录kafkalogs:C:kafkakafka_2.11-1.1.1kafkalogs1.3.31.3.3 修改配置文件修改配置文
11、件打开 C:kafkakafka_2.11-1.1.1config server.properties实际的修改项为:10broker.id=1listeners=PLAINTEXT:/:19092log.dirs=C:/kafka/kafka_2.11-1.1.1/kafkalogs#在 log.retention.hours=168 下面新增下面三项消息大小最大 1GBmessage.max.byte=1073741824replica.fetch.max.bytes=1073741824log.segment.bytes=1073741824default.replication.fac
12、tor=2#设置 zookeeper 的连接端口zookeeper.connect=XX.XX.0.12:12181,XX.XX.0.13:12181,XX.XX.0.14:12181,XX.XX.0.15:12181,XX.XX.0.16:12181配置说明:broker.id=0#当前机器在集群中的唯一标识,和 zookeeper 的myid 性质一样port=19092#当前 kafka 对外提供服务的端口默认是 9092host.name=192.168.7.100#这个参数默认是关闭的,在 0.8.1有个 bug,DNS 解析问题,失败率的问题。work.threads=3#这个是b
13、orker 进行网络处理的线程数11num.io.threads=8#这个是 borker 进行 I/O 处理的线程数log.dirs=/opt/kafka/kafkalogs/#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的 num.io.threads 要大于这个目录的个数这个目录,如果配置多个目录,新创建的 topic 他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个socket.send.buffer.bytes=102400#发送缓冲区 buffer 大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能socke
14、t.receive.buffer.bytes=102400#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘socket.request.max.bytes=104857600#这个参数是向kafka 请求消息或者向 kafka 发送消息的请请求的最大数,这个值不能超过java 的堆栈大小num.partitions=1#默认的分区数,一个topic 默认 1 个分区数log.retention.hours=168#默认消息的最大持久化时间,168小时,7 天message.max.byte=5242880#消息保存的最大值 5Mdefault.replication.factor
15、=2#kafka 保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务replica.fetch.max.bytes=5242880#取消息的最大字节数log.segment.bytes=1073741824#这个参数是:因为kafka 的消12息是以追加的形式落地到文件,当超过这个值的时候,kafka 会新起一个文件log.retention.check.interval.ms=300000#每隔 300000 毫秒去检查上面配置的 log 失效时间log.retention.hours=168 ,到目录查看是否有过期的消息如果有,删除log.cleaner.enable=fals
16、e#是否启用log 压缩,一般不用启用,启用的话可以提高性能zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218#设置 zookeeper 的连接端口1.41.4 其他节点配置其他节点配置将安装以上配置好的目录c:kafka拷贝到其他节点的c盘目录,并修改如下配置。1 1、JAVAJAVA 环境变量:环境变量:JAVA_HOME:C:kafkajdk1.7.0_51_x64PATH:C:kafkajdk1.7.0_51_x64bin2 2、zookeeperzookeeper 的的 myidmy
17、idC:kafkazookeeper-3.4.12zkdatamyid,修改为对应的数值XX.XX.0.12:1XX.XX.0.13:2XX.XX.0.14:313XX.XX.0.15:4XX.XX.0.16:53 3、kafkakafka 配置配置C:kafkakafka_2.11-1.1.1config server.properties的broker.id,修改为对应的数值XX.XX.0.12:1XX.XX.0.13:2XX.XX.0.14:3XX.XX.0.15:4XX.XX.0.16:51.51.5 服务启动服务启动1 1、启动启动 zookeeperzookeeperC:kafka
18、zookeeper-3.4.12binzkServer dXX.XX.0.12-16,依次双击启动。142 2、启动启动 kafkakafka运行 cmd,cd C:kafkakafka_2.11-1.1.1 目录,再执行命令:【cd C:kafkakafka_2.11-1.1.1】C:kafkakafka_2.11-1.1.1.binwindowskafka-server-start.bat.configserver.properties151.61.6 服务状态测试服务状态测试1.6.11.6.1 创建创建 TopicsTopics打开 cmd 进入 C:kafkakafka_2.11-1
19、.1.1binwindowsC:kafkakafka_2.11-1.1.1binwindowskafka-topics.bat-create-zookeeper localhost:12181-replication-factor 1-partitions 1-topic test001161.6.21.6.2 打开一个打开一个 ProducerProducer打开 cmd 进入 C:kafkakafka_2.11-1.1.1binwindowsC:kafkakafka_2.11-1.1.1binwindowskafka-console-producer.bat-broker-list loc
20、alhost:19092-topic test001等待输入消息内容。1.6.31.6.3 打开一个打开一个 ConsumerConsumer打开 cmd 进入 C:kafkakafka_2.11-1.1.1binwindowsC:kafkakafka_2.11-1.1.1binwindowskafka-console-consumer.bat-zookeeper localhost:12181-topic test00117然后就可以在 Producer 控制台窗口输入消息了,很快Consumer窗口就会显示出 Producer 发送的消息。181.6.41.6.4 查看所有主题查看所有主题
21、C:UsersDevelopC:kafkakafka_2.11-1.1.1binwindowskafka-topics.bat-list-zookeeper localhost:121811.6.51.6.5 查看查看 TopicTopic分区和副本分区和副本C:UsersDevelopC:kafkakafka_2.11-1.1.1binwindowskafka-topics.bat-describe-zookeeper localhost:12181191.71.7 消息大小调整消息大小调整Kafka 对于 10KB 大小的消息吞吐率最好,默认配置最大支持 1MB的消息大小。对于大消息的传输
22、,需要修改 kafka 的 server.properties、consumer、producer 的相关配置。server.properties修改:修改:打开 C:kafkakafka_2.11-1.1.1config server.properties按照最大 1GBmessage.max.bytes=1073741824replica.fetch.max.bytes=1073741824log.segment.bytes=1073741824consumer 配置:配置:max.partition.fetch.bytes=1073741824Producer 配置:配置:max.req
23、uest.size=107374182420#33554432,默认32Mbuffer.memory=1073741824org.apache.kafka mon.errors.RecordTooLargeException:The message is 36428062 bytes when serialized which islarger than the total memory buffer you have configuredwith the buffer.memory configuration.附件太大可能会内存溢出,还会涉及超时参数配置等。2 2 JAVAJAVA程序例如程
24、序例如2.12.1 ProducerProducer 程序例如程序例如2.1.12.1.1 PropertiesProperties 文件配置文件配置#producerbootstrap.servers=XX.XX.0.12:19092,XX.XX.0.13:19092,XX.XX.0.14:19092,XX.XX.0.15:19092,XX.XX.0.16:19092producer.type=syncrequest.required.acks=1#consumerenable.auto mit=true21#latest,earliest,noneauto.offset.reset=ear
25、liest建议公共参数如服务地址配置在 properties 文件里。其他参数根据接口需要程序中配置。/创建Producerprivateprivate Producer createProducer()Properties props=newnew Properties();String path=ProducerDemo.classclass.getResource(/).getFile().toString()+kafka.properties;trytry FileInputStreamfis=newnew FileInputStream(newnewFile(path);props.
26、load(fis);props.put(key.serializer,org.apache.kafkamon.serialization.IntegerSerializer);props.put(value.serializer,org.apache.kafkamon.serialization.StringSerializer);fis.close();catchcatch(Exception e)e.printStackTrace();22 returnreturn newnew KafkaProducer(props);2.1.22.1.2 PropertiesProperties 配置
27、详解配置详解#0:producer#0:producer不会等待不会等待brokerbroker发送发送ackack#1:#1:当当leaderleader接收到消息后发送接收到消息后发送ackack#all(-1):#all(-1):当所有的当所有的followerfollower都同步消息成功后发送都同步消息成功后发送ackackrequest.required.acks=0request.required.acks=02.1.32.1.3 主题主题+VALUE+VALUEimportimport java.io.File;importimport java.io.FileInputStr
28、eam;importimport java.util.Properties;importimportorg.apache.kafka.clients.producer.KafkaProducer;importimport org.apache.kafka.clients.producer.Producer;importimportorg.apache.kafka.clients.producer.ProducerRecord;23publicpublic classclass TopicValue/创建Producerprivateprivate Producer createProducer
29、()Properties props=newnew Properties();String path=ProducerDemo.classclass.getResource(/).getFile().toString()+kafka.properties;trytry FileInputStream fis=newnewFileInputStream(newnew File(path);props.load(fis);props.put(key.serializer,org.apache.kafkamon.serialization.StringSerializer);props.put(va
30、lue.serializer,org.apache.kafkamon.serialization.StringSerializer);fis.close();catchcatch(Exception e)e.printStackTrace();24 returnreturn newnew KafkaProducer(props);publicpublic staticstatic voidvoid main(String args)/消息主题String topicName=test001;TopicValue topicValueProducer=newnew TopicValue();Pr
31、oducer producer=topicValueProducer.createProducer();producer.send(newnew ProducerRecord(topicName,消息:TopicValue);producer.flush();producer.close();System.out.println(Message send successfully);252.1.42.1.4 主题主题+KEY+VALUE+KEY+VALUE2.1.4.12.1.4.1 Integer,Stringpackage kjsp.kafka.producer;import java.i
32、o.File;import java.io.FileInputStream;import java.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;public class TopicIntegerString/创建Producerprivate Producer createProducer()
33、Properties props=new Properties();String path=ProducerDemo.class.getResource(/).getFile().toString()26 +kafka.properties;try FileInputStream fis=newFileInputStream(new File(path);props.load(fis);props.put(key.serializer,org.apache.kafkamon.serialization.IntegerSerializer);props.put(value.serializer,
34、org.apache.kafkamon.serialization.StringSerializer);fis.close();catch(Exception e)e.printStackTrace();return new KafkaProducer(props);public static void main(String args)/消息主题String topicName=test001;27TopicIntegerString topicValueProducer=newTopicIntegerString();Producer producer=topicValueProducer
35、.createProducer();producer.send(new ProducerRecord(topicName,1,消息:TopicIntegerString1);producer.flush();producer.close();System.out.println(Message send successfully);2.1.4.22.1.4.2 String,Stringimport java.io.File;import java.io.FileInputStream;import java.util.Properties;importorg.apache.kafka.cli
36、ents.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import28org.apache.kafka.clients.producer.ProducerRecord;public class TopicStringString/创建Producerprivate Producer createProducer()Properties props=new Properties();String path=ProducerDemo.class.getResource(/).getFile().t
37、oString()+kafka.properties;try FileInputStream fis=newFileInputStream(new File(path);props.load(fis);props.put(key.serializer,org.apache.kafkamon.serialization.StringSerializer);props.put(value.serializer,org.apache.kafkamon.serialization.StringSerializer);fis.close();29 catch(Exception e)e.printSta
38、ckTrace();return new KafkaProducer(props);public static void main(String args)/消息主题String topicName=test001;TopicStringString topicValueProducer=newTopicStringString();Producer producer=topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName,TopicStringString001,消息:TopicString
39、String001);producer.flush();producer.close();System.out.println(Message send successfully);302.1.4.32.1.4.3 String,bytepackage kjsp.kafka.producer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;import org.apache.ka
40、fka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;public class TopicStringByte/创建Producerprivate Producer createProducer()Properties props=new Properties();String path=ProducerDemo.class.getResource(/).getFile().toString()31 +kafka.properties;try FileInputStream fi
41、s=newFileInputStream(new File(path);props.load(fis);props.put(key.serializer,org.apache.kafkamon.serialization.StringSerializer);props.put(value.serializer,org.apache.kafkamon.serialization.ByteArraySerializer);fis.close();catch(Exception e)e.printStackTrace();return new KafkaProducer(props);public
42、static void main(String args)/消息主题String topicName=test001;32TopicStringByte topicValueProducer=newTopicStringByte();Producer producer=topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName,TopicStringByte001,消息:TopicStringByte001.getBytes();producer.flush();producer.close();
43、System.out.println(Message send successfully);2.1.4.42.1.4.4 byte,bytepackage kjsp.kafka.producer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;33import org.apache.kafka.clients.producer.Producer;importorg.apache.
44、kafka.clients.producer.ProducerRecord;public class TopicByteByte/创建Producerprivate Producer createProducer()Properties props=new Properties();String path=ProducerDemo.class.getResource(/).getFile().toString()+kafka.properties;try FileInputStream fis=newFileInputStream(new File(path);props.load(fis);
45、props.put(key.serializer,org.apache.kafkamon.serialization.ByteArraySerializer);props.put(value.serializer,org.apache.kafka34mon.serialization.ByteArraySerializer);fis.close();catch(Exception e)e.printStackTrace();return new KafkaProducer(props);public static void main(String args)/消息主题String topicN
46、ame=test001;TopicByteByte topicValueProducer=newTopicByteByte();Producer producer=topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName,TopicByteByte001.getBytes(),消息:TopicByteByte001.getBytes();producer.flush();producer.close();System.out.println(Message send successfully);
47、352.1.4.52.1.4.5发发送文件消息送文件消息/*/package kjsp.kafka.producer;import java.io.BufferedOutputStream;import java.io.ByteArrayOutputStream;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import org.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.prod
48、ucer.ProducerRecord;/*author Develop36*发送消息类*/public class SendMsgFileTest/根据文件名获取字节数组public static byte getFileBytes(String fileName)byte buffer=null;FileInputStream fis=null;ByteArrayOutputStream bos=null;try File file=new File(fileName);fis=new FileInputStream(file);long fileSize=file.length();if
49、(fileSizeInteger.MAX_VALUE)System.out.println(文件太大,无法处理!);return null;bos=new ByteArrayOutputStream(int)fileSize);byte b=new byte1024;int len=0;while(len=fis.read(b,0,1024)!=-1)37bos.write(b,0,len);buffer=bos.toByteArray();catch(Exception ex)ex.printStackTrace();finally if(bos!=null)try bos.close();
50、catch(Exception ex)ex.printStackTrace();if(fis!=null)try fis.close();catch(Exception ex)ex.printStackTrace();return buffer;38public static void main(String args)try/消息主题String topicName=test001;TopicStringByte topicValueProducer=newTopicStringByte();Producer producer=topicValueProducer.createProduce