broker:
message.max.bytes = 1048576
replica.fetch.max.bytes = 1048576
producer:
max.request.size = 1048576
consumer:
#old consumer
fetch.message.max.bytes = 1048576
#new consumer
max.partition.fetch.bytes = 1048576
只有一个partition,飞行消息设为最大1
直接开启幂等,数据库或者支持主机的存储系统
Kafka 目前总共演进了 7 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0,其中的小版本和 Patch 版本很多
尽量保持服务器端版本和客户端版本一致,否则你将损失很多 Kafka 为你提供的性能优化收益。
最后我合并说下 1.0 和 2.0 版本吧,因为在我看来这两个大版本主要还是 Kafka Streams的各种改进,在消息引擎方面并未引入太多的重大功能特性。Kafka Streams 的确在这两个版本有着非常大的变化,也必须承认 Kafka Streams 目前依然还在积极地发展着。如果你是 Kafka Streams 的用户,至少选择 2.0.0 版本吧。
幂等和事务是Kafka 0.11.0.0版本引入的两个特性,以此来实现EOS(exactly once semantics,精确一次处理语义)。
幂等,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幂等性功能之后就可以避免这种情况。
0.10.0.0 是里程碑式的大版本,因为该版本引入了 Kafka Streams。
查看索引内容:
bin/kafka-dump-log.sh --files /tmp/kafka-logs/ topic-log-0/00000000000000000000.index
偏移量索引项的格式如下图所示。每个索引项占用8个字节,分为两个部分。
relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,占用4个字节,当前索引文件的文件名即为 baseOffset 的值。
position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。
Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。
时间戳索引项的格式如下图所示。
每个索引项占用12个字节,分为两个部分。
timestamp:当前日志分段最大的时间戳。
relativeOffset:时间戳所对应的消息的相对偏移量。
Kafka总结
总的来说Kafka快的原因:
1、partition顺序读写,充分利用磁盘特性,这是基础;
2、Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
3、Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。
mmap 和 sendfile总结
1、都是Linux内核提供、实现零拷贝的API;
2、sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
3、mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。
参考:https://zhuanlan.zhihu.com/p/78335525
Producer在生产发送消息时,难免会重复发送消息。Producer进行retry时会产生重试机制,发生消息重复发送。而引入幂等性后,重复发送只会生成一条有效的消息。Kafka作为分布式消息系统,它的使用场景常见与分布式系统中,比如消息推送系统、业务平台系统(如物流平台、银行结算平台等)。以银行结算平台来说,业务方作为上游把数据上报到银行结算平台,如果一份数据被计算、处理多次,那么产生的影响会很严重。
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。那这两个概念的用途是什么呢?
ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
produceid 默认是进程pid
bin/kafka-console-consumer.sh --topic soa-detail-local --bootstrap-server ?:9092