`
javafan_303
  • 浏览: 951131 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Kafka实践:通过8个case学习Kafka的功能

 
阅读更多

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Apache Kafka与传统消息系统相比,有以下不同:

  • 它被设计为一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;
  • 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

我们将通过几个case学习Kafka的功能。

这几个case是我在工作中解决一个问题而对Kafka的功能进行的调研。另外还做了性能的测试,在本文中没有提到。

  • 分区
    • case1: 同一个group的consumer并发处理消息
    • case2: 关闭一个consumer,消息会继续由另一个consumer处理
    • case3: 恢复consumer,会balance回来
  • offset
    • case4: auto.commit = true
    • case5: offset out of range
    • case6: auto.commit = false
  • delay
    • case7: delay功能
    • case8: 容错,是否会丢失数据

启动Kafka boker,创建Topic

启动kafka:

1
bin/kafka-server-start.sh config/server.properties

查看topic list:

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看特定的topic

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic colobu

创建topic:

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

High-level consumer实现

Producer实现

case 1

启动consumer 1:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 grouo1 colobu 2 > 1.txt

启动consumer 2:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 8 > 2.txt

发送 10000 个消息:

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 10000 colobu

查看每个consumer处理的messsage数量:

1
2
3
4
5
cat 1.txt|wc -l
2024
 
cat 2.txt|wc -l
7976

总数为 2024+7976=10000, 没有message丢失, 处理比大致为2024:7976 = 1:4,和我们的预想是一样的。

case 2 & case 3:

启动consumer 1:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 grouo1 colobu 5 > 1.txt

启动consumer 2:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 5 > 2.txt

发送 1000000 个消息:

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 500000 colobu

CTRL + C停掉 consumer1, 可以看到 consumer2 继续工作, consumer1停止了工作:

1
2
3
tail -f 1.txt
 
tail -f 2.txt

重新启动 consumer1,将处理:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 5 >> 1.txt

可以看到 consumer1 和 consumer2 继续工作:

1
tail -f 1.txt

等处理完我们检查一下consumer1和consumer2处理的消息总数,看是否有消息丢失:

1
2
cat *.txt|wc -l
500504

看起来我们处理的消息似乎多了一些,相信这是在consumer1重启的时候consumer2的offset还没来得及commit。

case 4

offset的值保存在zookeeper的如下节点中:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

可以看到不同的topic不同的分区有着自己的offset, 并且不同组分别拥有自己独立的offset。

这里有一个简单的工具可以监控offset: KafkaOffsetMonitor,
运行下面的命令就可以启动一个web界面来监控:

1
2
3
4
5
6
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk zk-server1,zk-server2 \
--port 8080 \
--refresh 10.seconds \
--retain 2.days

上面我们已经做了实验, 所以在zookeeper应该有offset的值,你可以用zkCli.sh或者上面的命令查询一下:

1
2
get /consumers/group1/offsets/colobu/3
39919

因为auto.commit.enable默认为true, consumer会定时地将offset写入到zookeeper上(时间间隔由auto.commit.interval.ms决定,默认1分钟).

case 5

根据Kafka的文档, 参数auto.offset.reset的功能如下所示:

what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer

如果offset is out of range, 这个参数决定了comsumer的行为:

smallest, 将offset设为当前所能用的最小的offset。 注意不一定是0。
largest, 将offset设为当前可用的最大的offset。也就是consumer将只处理最新写入的消息。 默认值。
其它, 校验失败,抛出kafka.common.InvalidConfigException异常。 看起来只能设置这两个值

我们可以将zookeeper的 /consumers节点删除::

1
[zk: localhost:2181(CONNECTED) 15] rmr /consumers

然后将的代码中加入一行

1
props.put("auto.offset.reset", "smallest");

或者

..

1
2
 
props.put("auto.offset.reset", "largest");

如果设置为largest,可以看到consumer不会处理以前发送的消息,只会处理新进的消息。
如果设置为smallest,consumer从头开始所有的消息。

case 6

默认情况下auto.commit.enable等于true,这也就意味着consumer会定期的commit offset。 前面也介绍了, zookeeper中节点中记录这这些offset。
我们也可以手工进行commit。

首先我们先将zookeeper中把/consumers节点删除掉。
然后在ConsumerExample.java中增加下面一行:

1
props.put("auto.commit.enable", "false");

然后启动consumer:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 10

过几分钟检查zookeeper的/consumers/group1节点,发现并没有offsets节点。
这符合我们的期望,因为我们将自动提交设为false。

修改ConsumerTest.javarun方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
int count = 0;
while (it.hasNext()) {
//格式: 处理时间,线程编号:消息
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + new String(it.next().message()));
count++;
 
if (count == 100) {
consumer.commitOffsets();
count = 0;
}
}
 
System.out.println("Shutting down Thread: " + threadNumber);
 
}

当处理了100个消息后就commit offset一次。
启动consumer,只运行一个线程,这样我们可以直观的感受到commit.如果多个线程, 我们不能精确确定应该发多少个消息才能使某个线程处理100个消息。

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 1

先用producer发送99个消息,

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 99 colobu

查看zookeeper的节点/consumers/group1下依然没有offsets节点。
再发送一个消息,

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 1 colobu

可以看到zookeeper的节点/consumers/group1下增加了offsets节点。

素以如果你有必要,你也可以手工控制offset的commit的时机。

case 7

这是我现在想利用Kafka实现的一个分布式DelayQueue的功能。
在我的文章跟着实例学习ZooKeeper的用法:讲到,利用zookeeper可以实现Distributed Delay Queue,但是Curator的作者也讲了, zookeeper真心不适合做queue,尤其在数据量很大的情况。

可以利用redis的sorted set实现: Delay queue in redis,
或者使用其它的一些框架实现。

这里我利用Kafka实现这样的一个功能。
实现方式大概和这个帖子相同: Delayed Queue

Each produced message should have a timestamp at which it was pushed to the queue. At the
consumer side, fetch a message from a partition and compare the message timestamp with system's timestamp to see if enough time has passed for you
to process the message. If enough time has passed, process the message and commit the message's offset otherwise make sure you do not commit the
offset.

我们将ConsumerTest的run方法修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
 
while (it.hasNext()) {
//格式: 处理时间,线程编号:消息
String msg = new String(it.next().message());
long t = System.currentTimeMillis() - Long.parseLong(msg.substring(0, msg.indexOf(",")));
 
if (t < 5 * 60 *1000) {
try {
Thread.currentThread().sleep(5 * 60 *1000 -t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg);
}
 
System.out.println("Shutting down Thread: " + threadNumber);
 
}

然后用producer发送100消息:

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 100 colobu

可以看到consumer5分钟后才处理这100个数据。

看起来方案可行。

case 8

注意offset是自动commit的。 在上面的例子中,如果sleep的过程中consumer重启,是否会有message丢失或者重复处理呢?
Kafka默认往zookeeper上写offset的频率是10秒。

查看Kafka的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
 
package kafka.consumer
 
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
import kafka.message.{MessageAndOffset, MessageAndMetadata}
import kafka.common.{KafkaException, MessageSizeTooLargeException}
 
 
/**
* An iterator that blocks until a value can be read from the supplied queue.
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
 
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
item
}
 
......
}

只有在next获取下一个消息的情况下才会更新consumedOffset,这样的话最多只有一个消息被读取还没来得及处理就因为程序crash而丢掉了。 在我们可以接受的范围之内。

其它

InfoQ网站有几篇Kafka文章挺好的,如

 

转载自

http://colobu.com/2015/03/12/kafka-in-practice/

分享到:
评论

相关推荐

    test_case_kafka:kafka测试用例

    test_case_kafka 这里介绍了如何在windows下安装kakfa,并且提供了单元测试程序进行生产消费。 1.解压kafka_2.12-1.0.1 2.修改kafka_2.12-1.0.1\config\zookeeper.properties文件 dataDir=D:\zookeeperlog 3.修改...

    kafka-case:定制的Kafka生产者和消费者

    自定义Kafka Producer和Consumer应用程序。 文章: : 目标: 创建自己的CustomConsumer。 创建自己的CustomProducer,以使用Twitter API自动生成并向主题发送至少10条消息。 使用自定义名称空间,以避免与其他...

    Kafka_The Definitive Guide_Real-Time Data and Stream Processing at Scale-2017

    We also focused on making the book general and comprehensive enough so it will be useful to anyone using Kafka, no matter the use case or architecture. We cover practical matters such as how to ...

    Creating.Maintainable.APIs.A.Practical.Case-Study.Approach

    Chapter 8: Implementing Synchronous and Asynchronous REST APIs Chapter 9: Documenting REST APIs Chapter 10: Testing REST APIs Chapter 11: Implementing Messaging APIs Chapter 12: Apache Kafka as a ...

    Fast.Data.Processing.Systems.with.SMACK.Stack

    Key Features This highly practical guide shows you how to use the best of the big data technologies to solve your ...Chapter 8: Study Case 2 - Connectors Chapter 9: Study Case 3 - Mesos and Docker

    Architecting HBase Applications

    Along with HBase principles and cluster deployment guidelines, this book includes in-depth case studies that demonstrate how large companies solved specific use cases with HBase. Authors Jean-Marc ...

    Next-Generation Big Data: A Practical Guide to Apache Kudu, Impala, and Spark

    Integrate HBase, Solr, Oracle, SQL Server, MySQL, Flume, Kafka, HDFS, and Amazon S3 with Apache Kudu, Impala, and Spark Use StreamSets, Talend, Pentaho, and CDAP for real-time and batch data ...

    SmartMoneyTracker

    例如,您可以通过卖出看涨期权的看涨期权和看跌期权来构建零美元期权交易,以有效地实现。 但是,某些策略(例如,出售不受保护的期权)可能会导致无限的风险。 结果,值得遵循由机构投资者,市场专家和其他金融...

    Big.Data.Analytics.with.Spark.and.Hadoop.17858

    Real-time data analytics using Spark Streaming with Apache Kafka and HBase is covered to help building streaming applications. New Structured streaming concept is explained with an IOT (Internet of ...

    JavaScript and JSON Essentials2018

    Chapter 1, Getting ...Chapter 12, Case Studies in JSON, is a case study on how JSON is enhanced for different domains, taking into consideration the various benefits it provided after implantation.

    vaccine-gitops:Gitops用于疫苗解决方案的环境和应用程序部署

    该树具有以下结构,其中的environment用于定义不同的Kafka风味,Postgresql,Apicurio,并且apps文件夹用于定义根据用例(冷链,优化订单,异常检测)使用的应用程序组件。 应用程序结构 我们可以为当前解决方案中...

    RealtimePlatformforSecondLookUseCaseusingSparkandKafka.pdf

    在SPARK SUMMIT 2017上,Ivy Lu, Capital One分享了题为《Real-time Platform for Second Look Use Case using Spark and Kafka》,就实时 批量数据,检查点,社交媒体反馈等方面的内容做了深入的分析。

    Pro Spark Streaming,The Zen of Real-time Analytics using Apache Spark

    Finally, these applications can use out-of-the- box integrations with other systems such as Kafka, Flume, HBase, and Cassandra. All of these features have turned Spark Streaming into the Swiss Army ...

Global site tag (gtag.js) - Google Analytics