而push情势则会把数据推到输入端口后平素进入程序

 
 再有两天就进入2018了,想想仍旧要预备一下过年的劳作主旋律。记念当年起先上学函数式编程时的基本点目标是想设计一套标准API給这么些习惯了OOP格局支付商业利用软件的程序员们,使他们能用一序列似传统数据库软件编程的主意来落实多线程,并行运算,分布式的数目处理应用程序,前提是这种编程模式不需要对函数式编程语言、多线程软件编程以及集群环境下的分布式软件编程情势有很高的经历要求。前边试着揭橥了一个按照scalaz-stream-fs2的多寡处理工具开源项目。该品种为主落实了多线程的数据库数据并行处理,能充足利用域内服务器的多核CPU环境以streaming,non-blocking形式加强数据处理效能。目前刚形成了对一切akka套装(suite)的摸底,感觉akka是一套精美的分布式编程工具:一是actor模式提供了多种多线程编程模式,再不怕akka-cluster能轻松地促成集群式的分布式编程,而集群环境变化只需要调整安排文件,无需改变代码。akka-stream是一套效能更是完整和有力的streaming工具库,那么只要以akka-stream为底蕴,设计一套能在集群环境里开展分布式多线程并行数据处理的开源编程工具应该可以是2018的重要任务。同样,用户仍是可以够遵照他们深谙的数据库应用编程模式轻松实现分布式多线程并行数据处理程序的付出。

 
 再有两天就进去2018了,想想依然要准备一下过年的劳作方向。回想当年始于读书函数式编程时的最重要目标是想设计一套标准API給这些习惯了OOP模式开发商业利用软件的程序员们,使他们能用一序列似传统数据库软件编程的章程来促成多线程,并行运算,分布式的多寡处理应用程序,前提是这种编程形式不需要对函数式编程语言、多线程软件编程以及集群环境下的分布式软件编程模式有很高的经验要求。前边试着发表了一个按照scalaz-stream-fs2的数码处理工具开源项目。该类型为主落实了多线程的数据库数据并行处理,能充裕利用域内服务器的多核CPU环境以streaming,non-blocking情势提升数据处理效用。最近刚形成了对任何akka套装(suite)的垂询,感觉akka是一套精美的分布式编程工具:一是actor格局提供了多种多线程编程格局,再不怕akka-cluster能轻松地贯彻集群式的分布式编程,而集群环境转变只需要调动部署文件,无需改变代码。akka-stream是一套效用更是完整和强有力的streaming工具库,那么一旦以akka-stream为底蕴,设计一套能在集群环境里展开分布式多线程并行数据处理的开源编程工具应该可以是2018的首要任务。同样,用户还是可以够依照他们熟练的数据库应用编程情势轻松实现分布式多线程并行数据处理程序的支付。

   
在大数量程序流行的前日,许多程序都面临着一同的难题:程序输入数据趋于无限大,抵达时间又不确定。一般的化解模式是拔取回调函数(callback-function)来促成的,但这样的缓解方案很容易造成“回调地狱(callback
hell)”,即所谓的“goto-hell”:程序控制跳来跳去很难跟踪,特别是一些变量如果在回调函数中改变后发出不可预料的结果。数据流(stream)是一种缓解问题的管用编程方式。Stream是一个抽象概念,能把程序数据输入过程和其他细节隐蔽起来,通过讲明情势把数量处理过程描述出来,使全部程序逻辑更易于了解跟踪。当然,牺牲的是对一部分运算细节的控制能力。咱们在前边介绍过scalaz-stream,它与akka-stream的关键区别在于:

 
 我把一般中小集团的IT系统分成两大一部分:一是实时的多少搜集(输入)部分,二是批量数量抽取、分析、处理部分。为了让传统中小型集团IT软件编程人士能开发服务器集群环境上多少平台(如云端数据平台)运行的软件系统,我打算通过这些DSP(Streaming-Data-Processor)项目来实现地点提到的第二有些。第一有些可以用CQRS(Command-Query-Responsibility-Separation)即读写分离架构和事件记录(event-sourcing)情势来落实一种高效便捷响应、安全祥和运转的数据搜集系统。这一部分我会在完成SDP项目后以akka-persistence为主导,通过akka-http,AMQP如RabitMQ等技巧来贯彻。

 
 我把一般中小公司的IT系统分成两大一些:一是实时的多少搜集(输入)部分,二是批量数额抽取、分析、处理部分。为了让传统中小型公司IT软件编程人士能开发服务器集群环境上多少平台(如云端数据平台)运行的软件系统,我打算通过这个DSP(Streaming-Data-Processor)项目来贯彻地点提到的第二局部。第一局部可以用CQRS(Command-Query-Responsibility-Separation)即读写分离架构和事件记录(event-sourcing)形式来实现一种高效快捷响应、安全稳定运转的数量收集系统。这有些我会在成就SDP项目后以akka-persistence为主干,通过akka-http,AMQP如RabitMQ等技巧来落实。

1、scalaz-stream是pull形式的,而akka-stream是push形式的。pull模式的缺陷是接收数据功用问题,因为在这种格局里先后必须不断重复检测(polling)输入端口是否有多少存在。而push格局则会把数据推到输入端口后直接进去程序,但假如数量源头动作太快程序不可以及时处理所有推送的数码时就会导致所谓的数额溢出问题,遗失数据。可是akka-stream实现了reactive-stream的back-pressure规范:数据发送方和接收方之间相互提醒,使过快的多寡爆发能按接收方要求慢下来甚至临时停下来。

 
按一般的scala和akka的编程模式编写多线程分布式数据库管理软件时一是要遵照akka代码格局,使用scala编程语言的一些较深的语法;二是急需涉及异步Async调用,集群Cluster节点任务安排及Streaming对外集成actor运算格局的底细,用户需要持有一定的scala,akka使用经验。再接下来就需要按业务流程把各业务环节分解成不借助于顺序的功用模块,然后把这些分拆出来的效益分派给集群中不同的节点上去运算处理。而对于SDP用户来说,具备最基本的scala知识,无需精通akka、actor、threads、cluster,只要按照SDP自定义的事体处理流形式就足以编制多线程分布式数据处理程序了。下边我就用部分文字及伪代码来叙述一下SDP的布局和效用:

 
按一般的scala和akka的编程格局编写多线程分布式数据库管理软件时一是要按部就班akka代码格局,使用scala编程语言的一对较深的语法;二是索要涉及异步Async调用,集群Cluster节点任务布置及Streaming对外集成actor运算格局的细节,用户需要具有一定的scala,akka使用经验。再接下来就需要按业务流程把各工作环节分解成不借助顺序的功用模块,然后把这多少个分拆出来的功效分派给集群中不同的节点上去运算处理。而对此SDP用户来说,具备最主题的scala知识,无需了解akka、actor、threads、cluster,只要按照SDP自定义的工作处理流情势就可以编写多线程分布式数据处理程序了。上边我就用一些文字及伪代码来描述一下SDP的构造和意义:

2、scalaz-sstream和akka-stream的数据流都是一种注明式的数额处理流程描述,属于一种运算方案,最后都急需某种运算器来对数据流按运算方案展开具体的演算,得出运算结果和暴发副效率。scalaz-stream的运算器是自备的函数式程序,特点是能很好的操纵线程使用和拓展相互运算。akka-stream的运算器是materializer。materializer在actor系统上运行,具备了actor格局先后的长处包括:信息使得、集群运算、监管政策(SupervisorStrategy)等等。

完全来说SDP是由一或五个Stream组成的;每个Stream就代表一段程序。一段完整的顺序Stream是由流元素源Source、处理节点Process-Node(Flow)及数码输出终点Sink六个环节组成,下面是一个名列三甲的主次框架:

一体化来说SDP是由一或两个Stream组成的;每个Stream就代表一段程序。一段完整的主次Stream是由流元素源Source、处理节点Process-Node(Flow)及数量输出终点Sink六个环节组成,下边是一个超人的次第框架:

akka-stream的数据流是由三类基础零部件组合而成,不同的咬合格局意味着不同的多少处理及发布功用。三类组件分别是:

  def load(qry: Query): PRG[R,M] = ???
  def process1: PRG[R,M] = ???
  def process2: PRG[R,M] = ???
  def recursiveProcess(prg: PRG[R,M]): PRG[R,M] = ???
  def results: PRG = ???

  load(qryOrders).process1.process2.recursiveProcess(subprogram).results.run
  def load(qry: Query): PRG[R,M] = ???
  def process1: PRG[R,M] = ???
  def process2: PRG[R,M] = ???
  def recursiveProcess(prg: PRG[R,M]): PRG[R,M] = ???
  def results: PRG = ???

  load(qryOrders).process1.process2.recursiveProcess(subprogram).results.run

1、Source:数据源。akka-stream属于push情势,所以Source也就是Publisher(数据宣布方),Source的形态SourceShape代表只有一个输出端口的形象。Source可以从单值、集合、某种Publisher或另一个数据羊水栓塞生数据流的元素(stream-element),包括:

从下边的示范中我们得以见到有着定义的函数都暴发PRG[R,M]连串结果。其中R类型就是stream的因素,它流动贯穿了先后的保有环节。就像下水道网络运行规律一样:污水由源头Source流入终点Sink,在旅途可能由此四个污水处理节点Node。每一个节点代表对管道中流动污水处理的不二法门,包括分叉引流、并叉合流、添加化学物质、最后通过极端把拍卖过的水向外输出。在PRG中流淌的R类型可能是数据如数据库表的一行,又或者是一条Sring类型的query如plain-sql,可以用JDBC来运转。cassandra的CQL也是String类型的。Slick,Quill,ScalikeJDBC和有些别样ORM的Query都足以生出plain-sql。

从下面的言传身教中大家可以见到有着定义的函数都暴发PRG[R,M]项目结果。其中R类型就是stream的要素,它流动贯穿了先后的兼具环节。就像下水道网络运行规律一样:污水由源头Source流入终点Sink,在中途可能因而四个污水处理节点Node。每一个节点代表对管道中流动污水处理的艺术,包括分叉引流、并叉合流、添加化学物质、最终经过终点把拍卖过的水向外输出。在PRG中流淌的R类型可能是数量如数据库表的一行,又或者是一条Sring类型的query如plain-sql,可以用JDBC来运行。cassandra的CQL也是String类型的。Slick,Quill,ScalikeJDBC和一部分其他ORM的Query都足以生出plain-sql。

  /**
   * Helper to create [[Source]] from `Iterable`.
   * Example usage: `Source(Seq(1,2,3))`
   *
   * Starts a new `Source` from the given `Iterable`. This is like starting from an
   * Iterator, but every Subscriber directly attached to the Publisher of this
   * stream will see an individual flow of elements (always starting from the
   * beginning) regardless of when they subscribed.
   */
  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
    single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)

  /**
   * Create a `Source` with one element.
   * Every connected `Sink` of this stream will see an individual stream consisting of one element.
   */
  def single[T](element: T): Source[T, NotUsed] =
    fromGraph(new GraphStages.SingleSource(element))

  /**
   * Helper to create [[Source]] from `Iterator`.
   * Example usage: `Source.fromIterator(() => Iterator.from(0))`
   *
   * Start a new `Source` from the given function that produces anIterator.
   * The produced stream of elements will continue until the iterator runs empty
   * or fails during evaluation of the `next()` method.
   * Elements are pulled out of the iterator in accordance with the demand coming
   * from the downstream transformation steps.
   */
  def fromIterator[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] =
    apply(new immutable.Iterable[T] {
      override def iterator: Iterator[T] = f()
      override def toString: String = "() => Iterator"
    })

  /**
   * Starts a new `Source` from the given `Future`. The stream will consist of
   * one element when the `Future` is completed with a successful value, which
   * may happen before or after materializing the `Flow`.
   * The stream terminates with a failure if the `Future` is completed with a failure.
   */
  def fromFuture[T](future: Future[T]): Source[T, NotUsed] =
    fromGraph(new FutureSource(future))

  /**
   * Helper to create [[Source]] from `Publisher`.
   *
   * Construct a transformation starting with given publisher. The transformation steps
   * are executed by a series of [[org.reactivestreams.Processor]] instances
   * that mediate the flow of elements downstream and the propagation of
   * back-pressure upstream.
   */
  def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =
    fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))

  /**
   * A graph with the shape of a source logically is a source, this method makes
   * it so also in type.
   */
  def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {
    case s: Source[T, M]         ⇒ s
    case s: javadsl.Source[T, M] ⇒ s.asScala
    case other ⇒ new Source(
      LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),
      other.shape)
  }

Source是一段程序的起始有些。一般的话Source是经过运算Query爆发一串数据行或者人工构建而成。Source也可以并行运算Query暴发,然后合并成一条无序的数据源,如下伪代码的序列:

Source是一段程序的起初有的。一般的话Source是通过运算Query爆发一串数据行或者人工构建而成。Source也得以互相运算Query暴发,然后合并成一条无序的数据源,如下伪代码的档次:

下边还有多少个特其它Source:

  def load_par(qrys: Query*): PRG[R,M] = ???
  def load_par(qrys: Query*): PRG[R,M] = ???
  /**
   * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.
   */
  def empty[T]: Source[T, NotUsed] = _empty
  private[this] val _empty: Source[Nothing, NotUsed] =
    Source.fromGraph(EmptySource)

  /**
   * Create a `Source` that will continually emit the given element.
   */
  def repeat[T](element: T): Source[T, NotUsed] = {
    val next = Some((element, element))
    unfold(element)(_ ⇒ next).withAttributes(DefaultAttributes.repeat)
  }

  /**
   * Creates [[Source]] that will continually produce given elements in specified order.
   *
   * Starts a new 'cycled' `Source` from the given elements. The producer stream of elements
   * will continue infinitely by repeating the sequence of elements provided by function parameter.
   */
  def cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] = {
    val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten
    fromIterator(() ⇒ iterator).withAttributes(DefaultAttributes.cycledSource)
  }

Process-Node是SDP最关键的一个组成部分,因为大部分用户定义的各类事情职能是在此地运算的。用户可以选择对作业职能拓展拆分然后分担给不同的线程或不同的集群节点举办多线程并行或分布式的演算。SDP应该为用户程序提供多线程,并行式、分布式的运算函数。首先,运算用户程序后应爆发R类型结果还要,作为一种reactive软件,必须确保完全消耗上一阶段爆发的所有R类型元素。下边是一个用户函数的款型:

Process-Node是SDP最根本的一个组成部分,因为多数用户定义的各种事务功效是在那边运算的。用户能够选拔对事情功能举办拆分然后分担给不同的线程或不同的集群节点开展多线程并行或分布式的演算。SDP应该为用户程序提供多线程,并行式、分布式的运算函数。首先,运算用户程序后应发生R类型结果同时,作为一种reactive软件,必须保证完全消耗上一阶段暴发的富有R类型元素。下面是一个用户函数的款型:

2、Sink:数据终端。属于数据元素的使用方,首要意义是消耗多少流中的元素。SinkShape是有一个输入端的数据流形状。Sink消耗流元素的事例有:

  type UserFunc = R => R 
  type UserFunc = R => R 
  /**
   * A `Sink` that will consume the stream and discard the elements.
   */
  def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink)

  /**
   * A `Sink` that will invoke the given procedure for each received element. The sink is materialized
   * into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the
   * normal end of the stream, or completed with `Failure` if there is a failure signaled in
   * the stream..
   */
  def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]] =
    Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")

除却fire-and-run类型的演算函数,SDP还应当提供针对性多线程或分布式程序的map-reduce式运算函数。初阶想法是:无论重临结果与否,分派任务都是由persistence-actor来施行的,这样能担保不会挂一漏万任何任务。假若完全任务急需在所有分派任务重回运算结果后再统一开展深度运算时akka的actor消息使得情势是最符合可是的了。具体意况可以参见我眼前关于cluster-sharding的博文。

而外fire-and-run类型的运算函数,SDP还应有提供针对性多线程或分布式程序的map-reduce式运算函数。起始想法是:无论重回结果与否,分派任务都是由persistence-actor来推行的,这样能确保不会挂一漏万任何任务。即便完全任务急需在具备分派任务再次来到运算结果后再统一举办深度运算时akka的actor音讯使得模式是最适合但是的了。具体境况可以参考我面前关于cluster-sharding的博文。

留意,akka-stream实际是在actor上举办演算的。actor的里边景观最终得以形成运算结果。下面的例证可以得出Sink的运算结果是Future[??]类型的。

Sink的紧要效用实际上是承保完全消耗程序中发生的拥有因素,这是reactive类型程序的总得要求。

Sink的要害功能实际上是保险完全消耗程序中暴发的享有因素,这是reactive类型程序的总得要求。

3、Flow:数据处理节点。对因此输入端口输入数据流的因素举行转变处理(transform)后经过输出端口输出。FlowShape有一个输入端和一个输出端。

好了,不知不觉还有多少个时辰就进去2017倒计时了。飞速凑合着在跨入2018事先把那篇发表出去,刚好是当年的末尾一篇博文。祝各位在新的一年中劳作生活顺利!

好了,不知不觉还有多少个钟头就进入2017倒计时了。连忙凑合着在跨入2018从前把这篇宣布出来,刚好是现年的末段一篇博文。祝各位在新的一年中工作生活顺利!

在akka-stream里多少流组件一般被叫做数据流图(graph)。我们可以用成千上万多少流图组成更大的stream-graph。

 

 

akka-stream最简易的一体化(或者关闭)线性数据流(linear-stream)就是一直把一个Source和一个Sink相接。这种方法表示一种对数据流所有因素的直白显示,如:source.runWith(Sink.foreach(println))。我们可以用Source.via来连接Flow,用Source.to连接Sink:

 

 

  override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)

  override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = {
    val toAppend =
      if (flow.traversalBuilder eq Flow.identityTraversalBuilder)
        LinearTraversalBuilder.empty()
      else
        flow.traversalBuilder

    new Source[T, Mat3](
      traversalBuilder.append(toAppend, flow.shape, combine),
      SourceShape(flow.shape.out))
  }

  /**
   * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
   * concatenating the processing steps of both.
   */
  def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left)

  /**
   * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
   * concatenating the processing steps of both.
   */
  def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = {
    RunnableGraph(traversalBuilder.append(sink.traversalBuilder, sink.shape, combine))
  }

 

 

可以窥见via和to分别是viaMat和toMat的简写,分别固定了Keep.left。意思是挑选左侧数据流图的演算结果。我们地点提过akka-stream是在actor系统里处理多少流元素的。在这些历程中并且可以用actor内部状况来发生运算结果。via和to连接了左右多少个graph,并且选用了左侧graph的运算结果。大家可以用viaMat和toMat来摘取左边graph运算结果。这是因此combine:
(Mat,Mat2)=>Mat3以此函数实现的。akka-stream提供了一个Keep对象来表达这种拔取:

 

 

/**
 * Convenience functions for often-encountered purposes like keeping only the
 * left (first) or only the right (second) of two input values.
 */
object Keep {
  private val _left = (l: Any, r: Any) ⇒ l
  private val _right = (l: Any, r: Any) ⇒ r
  private val _both = (l: Any, r: Any) ⇒ (l, r)
  private val _none = (l: Any, r: Any) ⇒ NotUsed

  def left[L, R]: (L, R) ⇒ L = _left.asInstanceOf[(L, R) ⇒ L]
  def right[L, R]: (L, R) ⇒ R = _right.asInstanceOf[(L, R) ⇒ R]
  def both[L, R]: (L, R) ⇒ (L, R) = _both.asInstanceOf[(L, R) ⇒ (L, R)]
  def none[L, R]: (L, R) ⇒ NotUsed = _none.asInstanceOf[(L, R) ⇒ NotUsed]
}

 

 

既然涉及运算结果的处理模式,大家就来探视Source,Flow,Sink的门类参数:

Source[+Out, +Mat]       //Out代表元素类型,Mat为运算结果类型
Flow[-In, +Out, +Mat]    //In,Out为数据流元素类型,Mat是运算结果类型
Sink[-In, +Mat]          //In是数据元素类型,Mat是运算结果类型

Keep对象提供的是对Mat的选项。下面源代码中to,toMat函数的回到结果都是RunnableGraph[Mat3],也就是说唯有连接了Sink的数据流才能举行演算。RunnableGraph提供一个run()函数来运算数据流:

/**
 * Flow with attached input and output, can be executed.
 */
final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {
  override def shape = ClosedShape

  /**
   * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
   */
  def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): RunnableGraph[Mat2] =
    copy(traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]))

  /**
   * Run this flow and return the materialized instance from the flow.
   */
  def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)
...

下面shape =
ClosedShape代表RunnableGraph的形状是虚掩的(ClosedShape),意思是说:一个可运行的graph所有输人输出端口都必须是连接的。

下边我们就用一个最简便的线性数据流来做些详细表明:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka._
import scala.concurrent._

object SourceDemo extends App {
  implicit val sys=ActorSystem("demo")
  implicit val mat=ActorMaterializer()
  implicit val ec=sys.dispatcher

  val s1: Source[Int,NotUsed] = Source(1 to 10)
  val sink: Sink[Any,Future[Done]] = Sink.foreach(println)
  val rg1: RunnableGraph[NotUsed] = s1.to(sink)
  val rg2: RunnableGraph[Future[Done]]  = s1.toMat(sink)(Keep.right)
  val res1: NotUsed = rg1.run()

  Thread.sleep(1000)

  val res2: Future[Done] = rg2.run()
  res2.andThen {
    case _ => sys.terminate()
  }

}

咱俩把大旨放在特别讲明的结果类型方面:Source的演算结果Mat类型是NotUsed,Sink的运算结果Mat类型是Future[Done]。从下面那段代码大家看出用toMat接纳回到Sink的演算结果Future[Done]才能捕捉到运算终止节点。下边的另一个事例包括了部分构成动作:

  val seq = Seq[Int](1,2,3)
  def toIterator() = seq.iterator
  val flow1: Flow[Int,Int,NotUsed] = Flow[Int].map(_ + 2)
  val flow2: Flow[Int,Int,NotUsed] = Flow[Int].map(_ * 3)
  val s2 = Source.fromIterator(toIterator)
  val s3 = s1 ++ s2

  val s4: Source[Int,NotUsed] = s3.viaMat(flow1)(Keep.right)
  val s5: Source[Int,NotUsed] = s3.via(flow1).async.viaMat(flow2)(Keep.right)
  val s6: Source[Int,NotUsed] = s4.async.viaMat(flow2)(Keep.right)
  (s5.toMat(sink)(Keep.right).run()).andThen {case _ => sys.terminate()}

一般来讲,数据流元素的享有处理过程都合并在一个actor上举行(steps-fusing),这样可以免去actor之间的消息传递,但同时也会限制数量元素的并行处理。aync的功效是指定左侧的graph在一个单身的actor上运行。注意:s6=s5。

从地点例子里的咬合结果类型我们发现:把一个Flow连接到一个Source上形成了一个新的Source。

实在我们得以用akka-stream
Source提供的点子糖来直接运算数据流,如下:

  s1.runForeach(println)
  val fres = s6.runFold(0)(_ + _)
  fres.onSuccess{case a => println(a)}
  fres.andThen{case _ => sys.terminate()}

下边是Source中的一些runner:

 /**
   * Shortcut for running this `Source` with a fold function.
   * The given function is invoked for every received element, giving it its previous
   * output (or the given `zero` value) and the element as input.
   * The returned [[scala.concurrent.Future]] will be completed with value of the final
   * function evaluation when the input stream ends, or completed with `Failure`
   * if there is a failure signaled in the stream.
   */
  def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: Materializer): Future[U] = runWith(Sink.fold(zero)(f))

  /**
   * Shortcut for running this `Source` with a foldAsync function.
   * The given function is invoked for every received element, giving it its previous
   * output (or the given `zero` value) and the element as input.
   * The returned [[scala.concurrent.Future]] will be completed with value of the final
   * function evaluation when the input stream ends, or completed with `Failure`
   * if there is a failure signaled in the stream.
   */
  def runFoldAsync[U](zero: U)(f: (U, Out) ⇒ Future[U])(implicit materializer: Materializer): Future[U] = runWith(Sink.foldAsync(zero)(f))

  /**
   * Shortcut for running this `Source` with a reduce function.
   * The given function is invoked for every received element, giving it its previous
   * output (from the second element) and the element as input.
   * The returned [[scala.concurrent.Future]] will be completed with value of the final
   * function evaluation when the input stream ends, or completed with `Failure`
   * if there is a failure signaled in the stream.
   *
   * If the stream is empty (i.e. completes before signalling any elements),
   * the reduce stage will fail its downstream with a [[NoSuchElementException]],
   * which is semantically in-line with that Scala's standard library collections
   * do in such situations.
   */
  def runReduce[U >: Out](f: (U, U) ⇒ U)(implicit materializer: Materializer): Future[U] =
    runWith(Sink.reduce(f))

  /**
   * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
   * for each received element.
   * The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the
   * normal end of the stream, or completed with `Failure` if there is a failure signaled in
   * the stream.
   */
  // FIXME: Out => Unit should stay, right??
  def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Done] = runWith(Sink.foreach(f))

它们的法力都是由此runWith实现的:

 /**
   * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
   * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].
   */
  def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = toMat(sink)(Keep.right).run()

其实是利用了Sink类里的应和措施Sink.???。

下边是此次的示范源代码:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka._
import scala.concurrent._

object SourceDemo extends App {
  implicit val sys=ActorSystem("demo")
  implicit val mat=ActorMaterializer()
  implicit val ec=sys.dispatcher

  val s1: Source[Int,NotUsed] = Source(1 to 10)
  val sink: Sink[Any,Future[Done]] = Sink.foreach(println)
  val rg1: RunnableGraph[NotUsed] = s1.to(sink)
  val rg2: RunnableGraph[Future[Done]]  = s1.toMat(sink)(Keep.right)
  val res1: NotUsed = rg1.run()

  Thread.sleep(1000)

  val res2: Future[Done] = rg2.run()
  res2.andThen {
    case _ =>   //sys.terminate()
  }

  val seq = Seq[Int](1,2,3)
  def toIterator() = seq.iterator
  val flow1: Flow[Int,Int,NotUsed] = Flow[Int].map(_ + 2)
  val flow2: Flow[Int,Int,NotUsed] = Flow[Int].map(_ * 3)
  val s2 = Source.fromIterator(toIterator)
  val s3 = s1 ++ s2

  val s4: Source[Int,NotUsed] = s3.viaMat(flow1)(Keep.right)
  val s5: Source[Int,NotUsed] = s3.via(flow1).async.viaMat(flow2)(Keep.right)
  val s6: Source[Int,NotUsed] = s4.async.viaMat(flow2)(Keep.right)
  (s5.toMat(sink)(Keep.right).run()).andThen {case _ => } //sys.terminate()}

  s1.runForeach(println)
  val fres = s6.runFold(0)(_ + _)
  fres.onSuccess{case a => println(a)}
  fres.andThen{case _ => sys.terminate()}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关文章