
主流流处理框架比较.doc
18页word分布式流处理是对无边界数据集进展连续不断的处理、聚合和分析它跟MapReduce一样是一种通用计算,但我们期望延迟在毫秒或者秒级别这类系统一般采用有向无环图〔DAG〕DAG是任务链的图形化表示,我们用它来描述流处理作业的拓扑如如下图,数据从sources流经处理任务链到sinks单机可以运行DAG,但本篇文章主要聚焦在多台机器上运行DAG的情况关注点当选择不同的流处理系统时,有以下几点需要注意的:· 运行时和编程模型:平台框架提供的编程模型决定了许多特色功能,编程模型要足够处理各种应用场景这是一个相当重要的点,后续会继续· 函数式原语:流处理平台应该能提供丰富的功能函数,比如,map或者filter这类易扩展、处理单条信息的函数;处理多条信息的函数aggregation;跨数据流、不易扩展的操作join· 状态管理:大局部应用都需要保持状态处理的逻辑流处理平台应该提供存储、访问和更新状态信息· 消息传输保障:消息传输保障一般有三种:at most once,at least once和exactly onceAt most once的消息传输机制是每条消息传输零次或者一次,即消息可能会丢失;A t least once意味着每条消息会进展屡次传输尝试,至少一次成功,即消息传输可能重复但不会丢失;Exactly once的消息传输机制是每条消息有且只有一次,即消息传输既不会丢失也不会重复。
· 容错:流处理框架中的失败会发生在各个层次,比如,网络局部,磁盘崩溃或者节点宕机等流处理框架应该具备从所有这种失败中恢复,并从上一个成功的状态〔无脏数据〕重新消费· 性能:延迟时间〔Latency〕,吞吐量〔Throughput〕和扩展性〔Scalability〕是流处理应用中极其重要的指标平台的成熟度和承受度:成熟的流处理框架可以提供潜在的支持,可用的库,甚至开发问答帮助选择正确的平台会在这方面提供很大的帮助运行时和编程模型运行时和编程模型是一个系统最重要的特质,因为它们定义了表达方式、可能的操作和将来的局限性因此,运行时和编程模型决定了系统的能力和适用场景实现流处理系统有两种完全不同的方式:一种是称作原生流处理,意味着所有输入的记录一旦到达即会一个接着一个进展处理第二种称为微批处理把输入的数据按照某种预先定义的时间间隔〔典型的是几秒钟〕分成短小的批量数据,流经流处理系统两种方法都有其先天的优势和不足首先以原生流处理开始,原生流处理的优势在于它的表达方式数据一旦到达立即处理,这些系统的延迟性远比其它微批处理要好除了延迟性外,原生流处理的状态操作也容易实现,后续将详细讲解一般原生流处理系统为了达到低延迟和容错性会花费比拟大的本钱,因为它需要考虑每条记录。
原生流处理的负载均衡也是个问题比如,我们处理的数据按key分区,如果分区的某个key是资源密集型,那这个分区很容易成为作业的瓶颈接下来看下微批处理将流式计算分解成一系列短小的批处理作业,也不可防止的减弱系统的表达力像状态管理或者join等操作的实现会变的困难,因为微批处理系统必须操作整个批量数据并且,batch interval会连接两个不易连接的事情:根底属性和业务逻辑相反地,微批处理系统的容错性和负载均衡实现起来非常简单,因为微批处理系统仅发送每批数据到一个worker节点上,如果一些数据出错那就使用其它副本微批处理系统很容易建立在原生流处理系统之上编程模型一般分为组合式和声明式组合式编程提供根本的构建模块,它们必须严密结合来创建拓扑新的组件经常以接口的方式完成相对应地,声明式API操作是定义的高阶函数它允许我们用抽象类型和方法来写函数代码,并且系统创建拓扑和优化拓扑声明式API经常也提供更多高级的操作〔比如,窗口函数或者状态管理〕后面很快会给出样例代码主流流处理系统有一系列各种实现的流处理框架,不能一一列举,这里仅选出主流的流处理解决方案,并且支持Scala API因此,我们将详细介绍Apache Storm,Trident,Spark Streaming,Samza和Apache Flink。
前面选择讲述的虽然都是流处理系统,但它们实现的方法包含了各种不同的挑战这里暂时不讲商业的系统,比如Google MillWheel或者Amazon Kinesis,也不会涉与很少使用的Intel GearPump或者Apache ApexApache Storm最开始是由Nathan Marz和他的团队于2010年在数据分析公司BackType开发的,后来BackType公司被Twitter收购,接着Twitter开源Storm并在2014年成为Apache顶级项目毋庸置疑,Storm成为大规模流数据处理的先锋,并逐渐成为工业标准Storm是原生的流处理系统,提供low-level的APIStorm使用Thrift来定义topology和支持多语言协议,使得我们可以使用大局部编程语言开发,Scala自然包括在Trident是对Storm的一个更高层次的抽象,Trident最大的特点以batch的形式进展流处理Trident简化topology构建过程,增加了窗口操作、聚合操作或者状态管理等高级操作,这些在Storm中并不支持相对应于Storm的At most once流传输机制,Trident提供了Exactly once传输机制。
Trident支持Java,Clojure和Scala当前Spark是非常受欢迎的批处理框架,包含Spark SQL,MLlib和Spark StreamingSpark的运行时是建立在批处理之上,因此后续参加的Spark Streaming也依赖于批处理,实现了微批处理接收器把输入数据流分成短小批处理,并以类似Spark作业的方式处理微批处理Spark Streaming提供高级声明式API〔支持Scala,Java和Python〕Samza最开始是专为LinkedIn公司开发的流处理解决方案,并和LinkedIn的Kafka一起贡献给社区,现已成为根底设施的关键局部Samza的构建严重依赖于基于log的Kafka,两者严密耦合Samza提供组合式API,当然也支持Scala最后来介绍Apache FlinkFlink是个相当早的项目,开始于2008年,但只在最近才得到注意Flink是原生的流处理系统,提供high level的APIFlink也提供API来像Spark一样进展批处理,但两者处理的根底是完全不同的Flink把批处理当作流处理中的一种特殊情况在Flink中,所有的数据都看作流,是一种很好的抽象,因为这更接近于现实世界。
快速的介绍流处理系统之后,让我们以下面的表格来更好清晰的展示它们之间的不同:Word CountWordcount之于流处理框架学习,就好比hello world之于编程语言学习它能很好的展示各流处理框架的不同之处,让我们从Storm开始看看如何实现Wordcount: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new Split(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); ... Map
第二行代码定义一个spout,作为数据源然后是一个处理组件bolt,分割文本为单词接着,定义另一个bolt来计算单词数〔第四行代码〕也可以看到魔数5,8和12,这些是并行度,定义集群每个组件执行的独立线程数第八行到十五行是实际的WordCount bolt实现因为Storm不支持建的状态管理,所有这里定义了一个局部状态按之前描述,Trident是对Storm的一个更高层次的抽象,Trident最大的特点以batch的形式进展流处理除了其它优势,Trident提供了状态管理,这对wordcount实现非常有用public static StormTopology buildTopology(LocalDRPC drpc) { FixedBatchSpout spout = ... TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"),new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); ... }如你所见,上面代码使用higher level操作,比如each〔第七行代码〕和groupby〔第八行代码〕。
并且使用Trident管理状态来存储单词数〔第九行代码〕下面是时候祭出提供声明式API的Apache Spark记住,相对于前面的例子,这些代码相当简单,几乎没有冗余代码下面是简单的流式计算单词数:val conf = new SparkConf().setAppName("wordcount")val ssc = new StreamingContext(conf, Seconds(1))val text = ...val counts = text.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _)counts.print()ssc.start()ssc.awaitTermination()每个Spark Streaming的作业都要有StreamingContext,它是流式函数的入口StreamingContext加载第一行代码定义的配置conf,但更重要地,第二行代码定义batch interval〔这里设置为1秒〕第六行到八行代码是整个单词数计算这些是标准的函数式代码,Spark定义topology并且分布式执行。
第十二行代码是每个Spark Streaming作业最后的局部:启动计算记住,Spark Streaming作业一旦启动即不可修改接下来看。
