鉴于fs2基本沿用了scalaz-stream的规律,由于fs2基本沿用了scalaz-stream的法则

 

  • type Pipe[F,A,B] = Stream[F,A] => Stream[F,B]
  • type Pipe2[F,A,B,C] = (Stream[F,A], Stream[F,B]) => Stream[F,C]
  • Pipe替代了ChannelProcess1
  • Pipe2替代了TeeWye

与List和Stream操作相似,大家一致能够对scalar-stream
Process施用同样的操作函数,也正是部分stream转换函数:

myTake和myTakeC发生了差别的结果。

我们先运算那一个运算流,结果为3个Task,然后再运算Task来获得运算值:

Wye[-I1,-I2,+O]:二对一的冬季输入数据转换节点:不按左右一一,按上游数据发送情形接受I1,I2类型输入后转移成O类型数据输出

  • type Pipe[F,A,B] = Stream[F,A] => Stream[F,B]
  • type Pipe2[F,A,B,C] = (Stream[F,A], Stream[F,B]) => Stream[F,C]
  • Pipe 替代了 Channel 和 Process1 
  • Pipe2 替代了 Tee 和 Wye
1 Stream(1,2,3).repeat2   .throughPure(pipe.take(10))3   .throughPure(pipe.filter(_ % 2 == 0))4   .toList                                    //> res13: List[Int] = List

Process又分以下几类:

1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))
2 val t2 = s2.runLog                                //> t2  : fs2.Task[Vector[Int]] = Task
3 t2.unsafeRun                                      //> res22: Vector[Int] = Vector(1, 2, 3)

④ 、更简短的流转换组件(stream
transformation primitives)

scalaz-stream是个积极读取格局的流(pull
model stream),Process转换stream的法子不是以Stream[I] =>
Stream[O]那种函数情势,而是一种状态转换方式展开(state
transition),所以这个情况就十分向一个驱动程序发出的伸手:

作者们先运算那些运算流,结果为1个Task,然后再运算Task来博取运算值:

叁 、fs2不再只局限于Task一种副成效运算方式。用户可以提供温馨的effect类型

以下是有的不难的Process创设情势:

   
fs2是scalaz-stream的风行版本,沿用了scalaz-stream被动式(pull
model)数据流原理但选取了全新的贯彻情势。fs2相比scalaz-stream而言具备了:更简单的基础零部件(combinator)、更安全的类型、能源使用(type
safe, resource
safety)、更高的演算成效。由于fs2基本沿用了scalaz-stream的原理,所以大家会在下边包车型地铁议论里重点介绍fs2的采纳。依照fs2的法定文件,fs2具备了以下新的性状:

大家得以用through来连接这几个transducer:

Await[+F[_],A,+O]:供给运算F[A],得出F[A]的结果A后输入函数rcv再运算得出下二个Process状态。那一个是flatMap函数的结构化版本

 

1 s3.through(myFilter(_ % 2 == 0)).through(myTake(3)).runLog.unsafeRun2                                                   //> res23: Vector[Int] = Vector

Sink[+F[_],-O]:运算终点,在此对O类型数据开始展览F运算,不重临值:O
=> F[Unit]

1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))

⑧ 、Stream取代了Process。fs第22中学再没有Process壹 、Tee、Wye、Channel那些项目别称,取而代之的是:

  type Process0[+O] = Process[Nothing,O]  /**   * A single input stream transducer. Accepts input of type `I`,   * and emits values of type `O`.   */  type Process1[-I,+O] = Process[Env[I,Any]#Is, O]  /**   * A stream transducer that can read from one of two inputs,   * the 'left' (of type `I`) or the 'right' (of type `I2`).   * `Process1[I,O] <: Tee[I,I2,O]`.   */  type Tee[-I,-I2,+O] = Process[Env[I,I2]#T, O]  /**   * A stream transducer that can read from one of two inputs,   * non-deterministically.   */  type Wye[-I,-I2,+O] = Process[Env[I,I2]#Y, O]  /**   * An effectful sink, to which we can send values. Modeled   * as a source of effectful functions.   */  type Sink[+F[_],-O] = Process[F, O => F[Unit]]  /**   * An effectful channel, to which we can send values and   * get back responses. Modeled as a source of effectful   * functions.   */  type Channel[+F[_],-I,O] = Process[F, I => F[O]]

 

fs2的绝活应该是二十多线程编制程序了。在Stream的品类款式中:Stream[F[_],A],F[_]是一种恐怕发生副作用的演算格局,当F[_]等于Nothing时,Stream[Nothing,A]是一种纯数据流,而Stream[F[_],A]正是一种运算流了。我们得以在对运算流举行状态转换的历程中展开演算来落到实处F的副成效如:数据库读写、IO操作等。fs2不再绑定Task一种运算情势了。任何有Catchable实例的Monad都足以改为Stream的演算格局。可是,作为一种以三十二线程编制程序为主导的工具库,没有啥运算格局会比Task更适于了。
咱俩得以把二个纯数据流升格成运算流:

里头F是个高阶类,是一种算法,O是Process的运算值。从类型款式上看Process是个对O类型值进行F运算的节点,那么scalaz-stream就应有是个运算流了。Process包涵以下二种情景:

 

1 val pll = Stream(1,2,3).pure.open    //> pll  : fs2.Pull[fs2.Pure,Nothing,fs2.Stream.Handle[fs2.Pure,Int]] = fs2.Pull2 de5031f3 val strm = pll.close                 //> strm  : fs2.Stream[fs2.Pure,Nothing] = evalScope(Scope(Bind(Eval,<4 function1>))).flatMap(<function1>)
1  p.take(2).runLog.run                             //> res14: Vector[Int] = Vector2  p.filter {_ > 2}.runLog.run                      //> res15: Vector[Int] = Vector3  p.last.runLog.run                                //> res16: Vector[Int] = Vector4  p.drop(1).runLog.run                             //> res17: Vector[Int] = Vector5  p.exists{_ > 5}.runLog.run                       //> res18: Vector[Boolean] = Vector
 1 (Stream(1,2,3) ++ Stream(4,5)).toList             //> res5: List[Int] = List(1, 2, 3, 4, 5)
 2 Stream(1,2,3).map { _ + 1}.toList                 //> res6: List[Int] = List(2, 3, 4)
 3 Stream(1,2,3).filter { _ % 2 == 0}.toList         //> res7: List[Int] = List(2)
 4 Stream(1,2,3).fold(0)(_ + _).toList               //> res8: List[Int] = List(6)
 5 Stream(None,Some(1),Some(3),None).collect {
 6   case None => 0
 7   case Some(i) => i
 8 }.toList                                          //> res9: List[Int] = List(0, 1, 3, 0)
 9 Stream.range(1,5).intersperse(42).toList          //> res10: List[Int] = List(1, 42, 2, 42, 3, 42, 4)
10 Stream(1,2,3).flatMap {x => Stream(x,x)}.toList   //> res11: List[Int] = List(1, 1, 2, 2, 3, 3)
11 Stream(1,2,3).repeat.take(5).toList               //> res12: List[Int] = List(1, 2, 3, 1, 2)

上面包车型的士事例里展现了fs2的运算流从源头到传换(Transducer)向来到终点的采用示范:

1 val p: Process[Task,Int] = Process.emitAll(Seq(1,2,3))  //> p  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append,Vector(<function1>))2  Process.range(1,2,3).toSource                   //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append,Vector(<function1>))3  //把F[A]升格成Process[F,A]4 Process.eval(Task.delay {5 * 8})                 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@56aac163,<function1>,<function1>)

 

1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk)2 val t2 = s2.runLog                                //> t2  : fs2.Task[Vector[Int]] = Task3 t2.unsafeRun                                      //> res22: Vector[Int] = Vector
/**   * Collect the outputs of this `Process[F,O]` into a Monoid `B`, given a `Monad[F]` in   * which we can catch exceptions. This function is not tail recursive and   * relies on the `Monad[F]` to ensure stack safety.   */final def runFoldMap[F2[x] >: F[x], B](f: O => B)(implicit F: Monad[F2], C: Catchable[F2], B: Monoid[B]): F2[B] = {...}/**   * Collect the outputs of this `Process[F,O]`, given a `Monad[F]` in   * which we can catch exceptions. This function is not tail recursive and   * relies on the `Monad[F]` to ensure stack safety.   */final def runLog[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Vector[O2]] = {...}/** Run this `Process` solely for its final emitted value, if one exists. */final def runLast[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Option[O2]] = {...}/** Run this `Process` solely for its final emitted value, if one exists, using `o2` otherwise. */final def runLastOr[F2[x] >: F[x], O2 >: O](o2: => O2)(implicit F: Monad[F2], C: Catchable[F2]): F2[O2] =    runLast[F2, O2] map { _ getOrElse o2 }/** Run this `Process`, purely for its effects. */final def run[F2[x] >: F[x]](implicit F: Monad[F2], C: Catchable[F2]): F2[Unit] =    F.void(drain.runLog
 1 object pipe {
 2 ...
 3 /** Drop `n` elements of the input, then echo the rest. */
 4   def drop[F[_],I](n: Long): Stream[F,I] => Stream[F,I] =
 5     _ pull (h => Pull.drop(n)(h) flatMap Pull.echo)
 6 ...
 7 /** Emits `true` as soon as a matching element is received, else `false` if no input matches */
 8   def exists[F[_], I](p: I => Boolean): Stream[F, I] => Stream[F, Boolean] =
 9     _ pull { h => Pull.forall[F,I](!p(_))(h) flatMap { i => Pull.output1(!i) }}
10 
11   /** Emit only inputs which match the supplied predicate. */
12   def filter[F[_], I](f: I => Boolean): Stream[F,I] => Stream[F,I] =
13     mapChunks(_ filter f)
14 
15   /** Emits the first input (if any) which matches the supplied predicate, to the output of the returned `Pull` */
16   def find[F[_],I](f: I => Boolean): Stream[F,I] => Stream[F,I] =
17     _ pull { h => Pull.find(f)(h).flatMap { case o #: h => Pull.output1(o) }}
18 
19 
20   /**
21    * Folds all inputs using an initial value `z` and supplied binary operator,
22    * and emits a single element stream.
23    */
24   def fold[F[_],I,O](z: O)(f: (O, I) => O): Stream[F,I] => Stream[F,O] =
25     _ pull { h => Pull.fold(z)(f)(h).flatMap(Pull.output1) }
26 ...
27 /** Emits all elements of the input except the first one. */
28   def tail[F[_],I]: Stream[F,I] => Stream[F,I] =
29     drop(1)
30 
31   /** Emit the first `n` elements of the input `Handle` and return the new `Handle`. */
32   def take[F[_],I](n: Long): Stream[F,I] => Stream[F,I] =
33     _ pull Pull.take(n)
34 ...
 1 def stdOut: Sink[Task,String]  = 2   _.evalMap { x => Task.delay{ println(s"milli: $x")}} 3                                                   //> stdOut: => fs2.Sink[fs2.Task,String] 4 Stream.repeatEval(Task.delay{System.currentTimeMillis}) 5   .map(_.toString) 6   .through(myTake(3)) 7   .to 8   .run.unsafeRun                                  //> milli: 1472001934708 9                                                   //| milli: 147200193471410                                                   //| milli: 1472001934714

上述这么些函数与scala标准库的stream很一般。再看看map,flatMap吧:

再示范另贰个Pipe的贯彻:take

1 Stream(1,2,3).repeat.pure2   .through(pipe.take(10))3   .through(pipe.filter(_ % 2 == 0))4   .toList                                         //> res14: List[Int] = List

这几个是纯数据流的创设立模型式。scalaz-stream平常把Task作为F运算,下边是Task运算流的创设或许转换方法:

ca88亚洲城网站, 

果真在Handle提供的函数里有await,receive等那几个读取函数。大家试着来促成二个简便的transducer:3个filter函数:

Process1[-I,+O]:一对一的数目转换节点:接收三个I类型输入,经过处理转换来O类型数据输出

implicit class HandleOps[+F[_],+A](h: Handle[F,A]) {
    def push[A2>:A](c: Chunk[A2])(implicit A2: RealSupertype[A,A2]): Handle[F,A2] =
      self.push(h: Handle[F,A2])(c)
    def push1[A2>:A](a: A2)(implicit A2: RealSupertype[A,A2]): Handle[F,A2] =
      self.push1(h: Handle[F,A2])(a)
    def #:[H](hd: H): Step[H, Handle[F,A]] = Step(hd, h)
    def await: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.await(h)
    def await1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.await1(h)
    def awaitNonempty: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = Pull.awaitNonempty(h)
    def echo1: Pull[F,A,Handle[F,A]] = Pull.echo1(h)
    def echoChunk: Pull[F,A,Handle[F,A]] = Pull.echoChunk(h)
    def peek: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.peek(h)
    def peek1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.peek1(h)
    def awaitAsync[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]):
      Pull[F2, Nothing, AsyncStep[F2,A2]] = self.awaitAsync(Sub1.substHandle(h))
    def await1Async[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]):
      Pull[F2, Nothing, AsyncStep1[F2,A2]] = self.await1Async(Sub1.substHandle(h))
    def covary[F2[_]](implicit S: Sub1[F,F2]): Handle[F2,A] = Sub1.substHandle(h)
  }

  implicit class HandleInvariantEffectOps[F[_],+A](h: Handle[F,A]) {
    def invAwait1Async[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]):
      Pull[F, Nothing, AsyncStep1[F,A2]] = self.await1Async(h)
    def invAwaitAsync[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]):
      Pull[F, Nothing, AsyncStep[F,A2]] = self.awaitAsync(h)
    def receive1[O,B](f: Step[A,Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await1.flatMap(f)
    def receive[O,B](f: Step[Chunk[A],Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await.flatMap(f)
  }
 1 import scala.language.higherKinds 2 def myFilter[F[_],A](f: A => Boolean): Pipe[F, A, A] = { 3   def go(h: Stream.Handle[F,A]): Pull[F,A,Unit] =  { 4 //      h.receive1 {case Step => if Pull.output1 >> go else go} 5        h.await1.flatMap { case Step => if Pull.output1 >> go else go} 6   } 7 //  sin => sin.open.flatMap {h => go}.close 8   sin => sin.pull 9 }                                   //> myFilter: [F[_], A](f: A => Boolean)fs2.Pipe[F,A,A]10 11 Stream.range(0,10).pure.through(myFilter(_ % 2 == 0)).toList12                                      //> res17: List[Int] = List(0, 2, 4, 6, 8)
case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]case class Await[+F[_], A, +O](    req: F[A]    , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance    , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance =  => Trampoline.delay(halt:Process[F,Nothing])    ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] {...}case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]case class Append[+F[_], +O](    head: HaltEmitOrAwait[F, O]    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance    ) extends Process[F, O] {...}   

 

纯数据流具备了无数与List相似的操作函数:

哈尔t:甘休发送

1 Stream.emits(Seq(1,2,3)).toList        //> res3: List[Int] = List(1, 2, 3)
2 Stream.emits(Seq(1,2,3)).toVector      //> res4: Vector[Int] = Vector(1, 2, 3)
 1 def myTakeC[F[_],A]: Pipe[F,A,A] = { 2   def go: Stream.Handle[F,A] => Pull[F,A,Unit] = h => { 3      if ( n <= 0 ) Pull.done 4      else Pull.awaitLimit.flatMap {case Step => 5        if (chunk.size <= n) Pull.output >> go(n-chunk.size) 6        else Pull.output(chunk.take } 7   } 8   sin => sin.pull 9 }                       //> myTakeC: [F[_], A]fs2.Pipe[F,A,A]10 val s1 = (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7))11                        //> s1  : fs2.Stream[Nothing,Int] = append(append(Segment(Emit(Chunk, S12 egment(Emit)).flatMap(<function1>)), Segment(Emit)).fla13 tMap(<function1>))14 s1.pure.through(myTake(4)).chunks.toList  //> res20: List[fs2.Chunk[Int]] = List, Chunk, Chunk, Chunk15 s1.pure.through(myTakeC(4)).chunks.toList //> res21: List[fs2.Chunk[Int]] = List(Chunk, Chunk
 1  //runFoldMap就好比Monoid的sum 2  p.runFoldMap.run                       //> res6: Int = 6 3  p.runFoldMap(i => i * 2).run                     //> res7: Int = 12 4  p.runFoldMap(_.toString).run                     //> res8: String = 123 5  //runLog把收到的元素放入vector中 6  p.runLog.run                                     //> res9: Vector[Int] = Vector 7  //runLast取最后一个元素,返回Option 8  p.runLast.run                                    //> res10: Option[Int] = Some 9  Process.halt.toSource.runLast.run                //> res11: Option[Nothing] = None10  Process.halt.toSource.runLastOr(65).run          //> res12: Int = 6511  //run只进行F的运算,放弃所有元素12  p.run      //> res13: scalaz.concurrent.Task[Unit] = scalaz.concurrent.Task@26b3fd4113  p.run.run  //Task[Unit] 返回Unit14  Process.emit(print("haha")).toSource.run.run     //> haha
1 Stream(1,2,3).repeat.take(10).filter(_ % 2 == 0).toList
2                                   //> res15: List[Int] = List(2, 2, 2)

下面咱们来看望fs2的有的基本操作:

Emit的意义是产生2个O值,Await的法力是运算F然后连年下一个Process,
Append的机能则是把前1个Process的音讯传递到下二个Process。Await和Append分别是分裂方法的Process连接情势。

/**
 * Chunk represents a strict, in-memory sequence of `A` values.
 */
trait Chunk[+A] { self =>
  def size: Int
  def uncons: Option[(A, Chunk[A])] =
    if (size == 0) None
    else Some(apply(0) -> drop(1))
  def apply(i: Int): A
  def copyToArray[B >: A](xs: Array[B]): Unit
  def drop(n: Int): Chunk[A]
  def take(n: Int): Chunk[A]
  def filter(f: A => Boolean): Chunk[A]
  def foldLeft[B](z: B)(f: (B,A) => B): B
  def foldRight[B](z: B)(f: (A,B) => B): B
  def indexWhere(p: A => Boolean): Option[Int] = {
    val index = iterator.indexWhere(p)
    if (index < 0) None else Some(index)
  }
  def isEmpty = size == 0
  def toArray[B >: A: ClassTag]: Array[B] = {
    val arr = new Array[B](size)
    copyToArray(arr)
    arr
  }
  def toList = foldRight(Nil: List[A])(_ :: _)
  def toVector = foldLeft(Vector.empty[A])(_ :+ _)
  def collect[B](pf: PartialFunction[A,B]): Chunk[B] = {
    val buf = new collection.mutable.ArrayBuffer[B](size)
    iterator.collect(pf).copyToBuffer(buf)
    Chunk.indexedSeq(buf)
  }
  def map[B](f: A => B): Chunk[B] = {
    val buf = new collection.mutable.ArrayBuffer[B](size)
    iterator.map(f).copyToBuffer(buf)
    Chunk.indexedSeq(buf)
  }
  def mapAccumulate[S,B](s0: S)(f: (S,A) => (S,B)): (S,Chunk[B]) = {
    val buf = new collection.mutable.ArrayBuffer[B](size)
    var s = s0
    for { c <- iterator } {
      val (newS, newC) = f(s, c)
      buf += newC
      s = newS
    }
    (s, Chunk.indexedSeq(buf))
  }
  def scanLeft[B](z: B)(f: (B, A) => B): Chunk[B] = {
    val buf = new collection.mutable.ArrayBuffer[B](size + 1)
    iterator.scanLeft(z)(f).copyToBuffer(buf)
    Chunk.indexedSeq(buf)
  }
  def iterator: Iterator[A] = new Iterator[A] {
    var i = 0
    def hasNext = i < self.size
    def next = { val result = apply(i); i += 1; result }
  }
...
1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk)

Tee[-I1,-I2,+O]:二对一的静止输入数据转换节点:从左右两边一左一右有顺接受I1,I2类型输入后转移成O类型数据输出

 

小编们见到Pipe正是几个Function1的品种外号,1个lambda:提供3个Stream[F,I],返回Stream[F,O]。那么在fs2里是怎样读取2个Stream[F,I]里的要素呢?我们日前提到是透过3个新的数据结构Pull来达成的,先来看看fs2是怎么样兑现Stream
>> Pull >> Stream转换的:

Append:连接内外七个Process

在上边的事例里我们运用了through,to等连接函数。由于数量最终发送到终点stdOut,大家不要用runLog来记录运算结果。

Stream的档次款式是:Stream[F[_],A]。从上面的例证大家看看有着的F[_]都是Nothing,大家称那样的流为纯数据流(pure
stream)。再值得注意的是各样流创设都形成了七个Chunk,代表一节元素。fs2扩展了Chunk类型来抓好多少成分处理效用。那是fs2的一项新成效。

那多少个函数都回去F2运算,借使F2是Task的话那么大家就能够用Task.run来收获结果值:

 1 def myTakeC[F[_],A](n: Int): Pipe[F,A,A] = {
 2   def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => {
 3      if ( n <= 0 ) Pull.done
 4      else Pull.awaitLimit(n)(h).flatMap {case Step(chunk,h) =>
 5        if (chunk.size <= n) Pull.output(chunk) >> go(n-chunk.size)(h)
 6        else Pull.output(chunk.take(n)) }
 7   }
 8   sin => sin.pull(go(n))
 9 }                       //> myTakeC: [F[_], A](n: Int)fs2.Pipe[F,A,A]
10 val s1 = (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7))
11                        //> s1  : fs2.Stream[Nothing,Int] = append(append(Segment(Emit(Chunk(1, 2))), S
12 egment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).fla
13 tMap(<function1>))
14 s1.pure.through(myTake(4)).chunks.toList  //> res20: List[fs2.Chunk[Int]] = List(Chunk(1), Chunk(2), Chunk(3), Chunk(4))
15 s1.pure.through(myTakeC(4)).chunks.toList //> res21: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4))

笔者们从Pull里用await1可能receive1把三个Step数据结构从Handle里扯出来然后再output到Pull结构里。把那几个Pull
close后取得我们供给的Stream。大家把例子使用的门类及函数款式陈列在底下:

1  p.map{i => s"Int:$i"}.runLog.run                 //> res19: Vector[String] = Vector(Int:1, Int:2, Int:3)2  p.flatMap{i => Process(i,i-1)}.runLog.run        //> res20: Vector[Int] = Vector(1, 0, 2, 1, 3, 2)

Stream的花色款式是:Stream[F[_],A]。从上面包车型的士事例我们见到全体的F[_]都以Nothing,大家称那样的流为纯数据流(pure
stream)。再值得注意的是每个流创设都形成了贰个Chunk,代表一节成分。fs2扩充了Chunk类型来提升数据成分处理效用。那是fs2的一项新功效。

1 Stream(1,2,3).repeat.take(10).filter(_ % 2 == 0).toList2                                   //> res15: List[Int] = List

周详检查能够看出来下边的这么些转换操作都以针对Process1类型的,都以因素在流通进程中拿走更换。我们会在下篇斟酌中牵线一些更扑朔迷离的Process操作,如:Sink,Tee,Wyn…,然后是scalaz-stream的实际运用

class Pull[+F[_],+O,+R](private[fs2] val get: Free[P[F,O]#f,Option[Either[Throwable,R]]])

在Pull的项目参数中F是1个运算,O代表输出成分类型,LX570代表Pull里的数目能源。大家得以从Qashqai读取成分。在上头的例证里pll的哈弗值是个Handle类型。那几个项目里应该提供了读取成分的章程:

1  Process.emit(1)                                  //> res0: scalaz.stream.Process0[Int] = Emit)2  Process.emitAll(Seq(1,2,3))                      //> res1: scalaz.stream.Process0[Int] = Emit(List3  Process.halt                                     //> res2: scalaz.stream.Process0[Nothing] = Halt4  Process.range(1,2,3)           //> res3: scalaz.stream.Process0[Int] = Append,Vector(<function1>))
1 def myTake[F[_],A](n: Int): Pipe[F,A,A] = {
2    def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => {
3       if (n <= 0) Pull.done
4       else h.receive1 { case a #: h => Pull.output1(a).flatMap{_ => go(n-1)(h)}}
5    }
6    sin => sin.pull(go(n))
7 }                                                 //> myTake: [F[_], A](n: Int)fs2.Pipe[F,A,A]
8 Stream.range(0,10).pure.through(myTake(3)).toList //> res18: List[Int] = List(0, 1, 2)
/** * Chunk represents a strict, in-memory sequence of `A` values. */trait Chunk[+A] { self =>  def size: Int  def uncons: Option[(A, Chunk[A])] =    if (size == 0) None    else Some(apply(0) -> drop(1))  def apply: A  def copyToArray[B >: A](xs: Array[B]): Unit  def drop: Chunk[A]  def take: Chunk[A]  def filter(f: A => Boolean): Chunk[A]  def foldLeft[B] => B): B  def foldRight[B] => B): B  def indexWhere(p: A => Boolean): Option[Int] = {    val index = iterator.indexWhere    if (index < 0) None else Some  }  def isEmpty = size == 0  def toArray[B >: A: ClassTag]: Array[B] = {    val arr = new Array[B]    copyToArray    arr  }  def toList = foldRight(Nil: List[A])  def toVector = foldLeft(Vector.empty[A])(_ :+ _)  def collect[B](pf: PartialFunction[A,B]): Chunk[B] = {    val buf = new collection.mutable.ArrayBuffer[B]    iterator.collect.copyToBuffer    Chunk.indexedSeq  }  def map[B](f: A => B): Chunk[B] = {    val buf = new collection.mutable.ArrayBuffer[B]    iterator.map.copyToBuffer    Chunk.indexedSeq  }  def mapAccumulate[S,B] => : (S,Chunk[B]) = {    val buf = new collection.mutable.ArrayBuffer[B]    var s = s0    for { c <- iterator } {      val (newS, newC) = f      buf += newC      s = newS    }    (s, Chunk.indexedSeq  }  def scanLeft[B] => B): Chunk[B] = {    val buf = new collection.mutable.ArrayBuffer[B](size + 1)    iterator.scanLeft.copyToBuffer    Chunk.indexedSeq  }  def iterator: Iterator[A] = new Iterator[A] {    var i = 0    def hasNext = i < self.size    def next = { val result = apply; i += 1; result }  }...
sealed trait Process[+F[_], +O]
1 val pll = Stream(1,2,3).pure.open    //> pll  : fs2.Pull[fs2.Pure,Nothing,fs2.Stream.Handle[fs2.Pure,Int]] = fs2.Pull
2 de5031f
3 val strm = pll.close                 //> strm  : fs2.Stream[fs2.Pure,Nothing] = evalScope(Scope(Bind(Eval(Snapshot),<
4 function1>))).flatMap(<function1>)

fs2的抢先八分之四变换函数都考虑了对Chunk数据的拍卖体制。我们先看看fs2是什么展现Chunk数据的:

对stream的Process实行演算有下边二种run方法:

1 Stream(1,2,3).repeat.pure
2   .through(pipe.take(10))
3   .through(pipe.filter(_ % 2 == 0))
4   .toList                                         //> res14: List[Int] = List(2, 2, 2)

当今采取myTake和myFilter就不须要pure升格了:

Emit[+O]:请求发一个O值

 

对1个Stream施用open后获得八个Pull类型。pll是个Pull数据结构,它的类型定义如下:

能够看到Emit,Await,哈尔t,Append都以Process类型的结构化状态。个中Await正是flatMap函数的结构化,Emit就好像Return,所以Process就是2个Free
Monad。

 

咱俩得以用toList可能toVector来运算纯数据流中的成分值:

Process[F[_],O]:source:运算流源点,由此发送F[O]运算

五 、扩张了越来越多并行运算组件(concurrent
primitives)

1 def myTake[F[_],A]: Pipe[F,A,A] = {2    def go: Stream.Handle[F,A] => Pull[F,A,Unit] = h => {3       if (n <= 0) Pull.done4       else h.receive1 { case a #: h => Pull.output1.flatMap{_ => go(n-1)}}5    }6    sin => sin.pull7 }                                                 //> myTake: [F[_], A]fs2.Pipe[F,A,A]8 Stream.range(0,10).pure.through(myTake(3)).toList //> res18: List[Int] = List

scalaz-stream是三个泛函数据流配件库(functional stream combinator
library),尤其适用于函数式编制程序。scalar-stream是由二个上述各样气象的Process串联组成。stream代表延续串的因素,恐怕是活动发出恐怕由外部的源流输入,如:延续串鼠标地点;文件中的文字行;数据库记录;又也许三番五次串的HTTP请求等。Process就是stream转换器(transducer),它能够把一种stream转换成另一种stream。Process的档次款式如下:

 

以上都是局地核心的List操作函数示范。

Channel[+F[_],-I,O]:运算终点,接受I类型输入,举办F运算后重返F[O]:I
=> F[O]

 

伍 、扩展了更加多并行运算组件(concurrent
primitives)

Process0[+O]:>>>Process[Nothing,+O]:source:纯数据流源点,发送O类型元素

我们早已提过fs2功效提高的里边一项是充实了节组(Chunk)数据类型和血脉相通的操作函数。Chunk是fs2内部使用的一种集合,那样fs2就足以一节一节(by
chunks)来拍卖数据了。Chunk自己有所了全部的集结函数:

1 Stream.emits(Seq(1,2,3)).toList        //> res3: List[Int] = List2 Stream.emits(Seq(1,2,3)).toVector      //> res4: Vector[Int] = Vector
1 Stream(1,2,3).repeat
2   .throughPure(pipe.take(10))
3   .throughPure(pipe.filter(_ % 2 == 0))
4   .toList                                    //> res13: List[Int] = List(2, 2, 2)
1 Stream()                       //> res0: fs2.Stream[Nothing,Nothing] = Segment(Emit2 Stream(1,2,3)                  //> res1: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk)3 Stream.emit(4)                 //> res2: fs2.Stream[Nothing,Int] = Segment(Emit)4 Stream.emits(Seq(1,2,3))       //> res3: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk)

对二个Stream施用open后取得一个Pull类型。pll是个Pull数据结构,它的类型定义如下:

七 、stream状态转换选拔了崭新的贯彻形式,使用了新的数据结构:Pull

六 、通过bracket函数增强了能源接纳安全,尤其是异线程能源占用的今后处理进度。用onFinalize取代了onComplete

贰 、流成分扩充了节组类型和连锁的操作方法

大家在前方提到过fs2使用了崭新的点子和数据类型来贯彻transducer。transducer的项目是Pipe,即:

我们在后边提到过fs2使用了崭新的措施和数据类型来贯彻transducer。transducer的类别是Pipe,即:

当今采纳myTake和myFilter就不必要pure升格了:

我们早已提过fs2作用升级的内部一项是充实了节组数据类型和相关的操作函数。Chunk是fs2内部使用的一种集合,那样fs2就足以一节一节(by
chunks)来拍卖数据了。Chunk本身装有了整机的成团函数:

1 s3.through(myFilter(_ % 2 == 0)).through(myTake(3)).runLog.unsafeRun
2                                                   //> res23: Vector[Int] = Vector(2, 2, 2)
implicit class HandleOps[+F[_],+A](h: Handle[F,A]) {    def push[A2>:A](c: Chunk[A2])(implicit A2: RealSupertype[A,A2]): Handle[F,A2] =      self.push(h: Handle[F,A2])    def push1[A2>:A](implicit A2: RealSupertype[A,A2]): Handle[F,A2] =      self.push1(h: Handle[F,A2])    def #:[H]: Step[H, Handle[F,A]] = Step    def await: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.await    def await1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.await1    def awaitNonempty: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = Pull.awaitNonempty    def echo1: Pull[F,A,Handle[F,A]] = Pull.echo1    def echoChunk: Pull[F,A,Handle[F,A]] = Pull.echoChunk    def peek: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.peek    def peek1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.peek1    def awaitAsync[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]):      Pull[F2, Nothing, AsyncStep[F2,A2]] = self.awaitAsync(Sub1.substHandle    def await1Async[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]):      Pull[F2, Nothing, AsyncStep1[F2,A2]] = self.await1Async(Sub1.substHandle    def covary[F2[_]](implicit S: Sub1[F,F2]): Handle[F2,A] = Sub1.substHandle  }  implicit class HandleInvariantEffectOps[F[_],+A](h: Handle[F,A]) {    def invAwait1Async[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]):      Pull[F, Nothing, AsyncStep1[F,A2]] = self.await1Async    def invAwaitAsync[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]):      Pull[F, Nothing, AsyncStep[F,A2]] = self.awaitAsync    def receive1[O,B](f: Step[A,Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await1.flatMap    def receive[O,B](f: Step[Chunk[A],Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await.flatMap  }

fs2是依据Stream的营造批次来分节的。大家来演示一下哪些选拔Pull的Chunk机制:

class Pull[+F[_],+O,+R](private[fs2] val get: Free[P[F,O]#f,Option[Either[Throwable,R]]])
type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]

def await1[F[_],I]: Handle[F,I] => Pull[F,Nothing,Step[I,Handle[F,I]]] = {...}

def receive1[F[_],I,O,R](f: Step[I,Handle[F,I]] => Pull[F,O,R]): Handle[F,I] => Pull[F,O,R] =
    _.await1.flatMap(f)

def pull[F[_],F2[_],A,B](s: Stream[F,A])(using: Handle[F,A] => Pull[F2,B,Any])(implicit S: Sub1[F,F2])
  : Stream[F2,B] =
    Pull.close { Sub1.substPull(open(s)) flatMap (h => Sub1.substPull(using(h))) }

在地点的例证里我们使用了through,to等连接函数。由于数量最后发送到终点stdOut,我们不要用runLog来记录运算结果。

纯数据流具备了不胜枚举与List相似的操作函数:

 1 (Stream(1,2,3) ++ Stream(4,5)).toList             //> res5: List[Int] = List(1, 2, 3, 4, 5) 2 Stream(1,2,3).map { _ + 1}.toList                 //> res6: List[Int] = List 3 Stream(1,2,3).filter { _ % 2 == 0}.toList         //> res7: List[Int] = List 4 Stream(1,2,3).fold(0).toList               //> res8: List[Int] = List 5 Stream(None,Some(1),Some(3),None).collect { 6   case None => 0 7   case Some => i 8 }.toList                                          //> res9: List[Int] = List(0, 1, 3, 0) 9 Stream.range(1,5).intersperse(42).toList          //> res10: List[Int] = List(1, 42, 2, 42, 3, 42, 4)10 Stream(1,2,3).flatMap {x => Stream}.toList   //> res11: List[Int] = List(1, 1, 2, 2, 3, 3)11 Stream(1,2,3).repeat.take(5).toList               //> res12: List[Int] = List(1, 2, 3, 1, 2)

咱俩得以用through来连接那个transducer:

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]def await1[F[_],I]: Handle[F,I] => Pull[F,Nothing,Step[I,Handle[F,I]]] = {...}def receive1[F[_],I,O,R](f: Step[I,Handle[F,I]] => Pull[F,O,R]): Handle[F,I] => Pull[F,O,R] =    _.await1.flatMapdef pull[F[_],F2[_],A,B](s: Stream[F,A])(using: Handle[F,A] => Pull[F2,B,Any])(implicit S: Sub1[F,F2])  : Stream[F2,B] =    Pull.close { Sub1.substPull flatMap (h => Sub1.substPull(using }

 

⑥ 、通过bracket函数增强了能源利用安全,特别是异线程能源占用的事后处理进度。用onFinalize取代了onComplete

果然在Handle提供的函数里有await,receive等那几个读取函数。我们试着来促成二个简便的transducer:一个filter函数:

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]

咱俩得以用toList可能toVector来运算纯数据流中的成分值:

再示范另八个Pipe的贯彻:take

③ 、fs2不再只局限于Task一种副功能运算方式(effect)。用户能够提供自身的effect类型

1 (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)).chunks.toList2     //> res16: List[fs2.Chunk[Int]] = List(Chunk, Chunk, Chunk

上述都以一些骨干的List操作函数示范。

壹 、完全不含任何外部正视(third-party
dependency)

 

myTake和myTakeC发生了分歧的结果。

fs2的多数变换函数都考虑了对Chunk数据的拍卖体制。大家先看看fs2是怎样显示Chunk数据的:

fs2是scalaz-stream的摩登版本,沿用了scalaz-stream被动式(pull
model)数据流原理但选用了全新的落到实处形式。fs2比较scalaz-stream而言具备了:更简洁的根基零部件(combinator)、更安全的花色、能源利用(type
safe, resource
safety)、更高的运算功能。由于fs2基本沿用了scalaz-stream的规律,所以我们会在上边包车型客车研讨里主要介绍fs2的应用。依据fs2的官方文书,fs2具备了以下新的特征:

1 (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)).chunks.toList
2     //> res16: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4, 5), Chunk(6, 7))

咱俩明白,纯数据流正是scalaz-stream里的Process1,即transducer,是承受对流实行状态转换的。在fs2里transducer正是Pipe(也是channel),大家一般用through来连接transducer。上边示范中的take,filter等都是transducer,我们能够在object
pipe里找到这一个函数:

肆 、更简短的流转换组件(stream
transformation primitives)

那会儿compiler不再出错误音信了。在fs2
pipe对象里的函数通过措施注入或然项目继承变成了Stream的自身函数,所以我们也足以直接在Stream类型上选拔那些transducer:

上述的throughPure等于是through
+
pure。Pure是未曾其余成效的F[_],是专程为支援compiler进行项目推导的门类。其实大家能够用pure先把纯数据流升格后再用through:

fs2是比照Stream的创设批次来分节的。大家来演示一下哪些使用Pull的Chunk机制:

 

上述的throughPure等于是through
+
pure。Pure是一向不别的作用的F[_],是尤其为辅助compiler进行项目推导的档次。其实大家得以用pure先把纯数据流升格后再用through:

此刻compiler不再出错误新闻了。在fs2
pipe对象里的函数通过措施注入大概项目继承变成了Stream的本身函数,所以大家也能够直接在Stream类型上采用那一个transducer:

 1 object pipe { 2 ... 3 /** Drop `n` elements of the input, then echo the rest. */ 4   def drop[F[_],I]: Stream[F,I] => Stream[F,I] = 5     _ pull (h => Pull.drop flatMap Pull.echo) 6 ... 7 /** Emits `true` as soon as a matching element is received, else `false` if no input matches */ 8   def exists[F[_], I](p: I => Boolean): Stream[F, I] => Stream[F, Boolean] = 9     _ pull { h => Pull.forall[F,I] flatMap { i => Pull.output1(!i) }}10 11   /** Emit only inputs which match the supplied predicate. */12   def filter[F[_], I](f: I => Boolean): Stream[F,I] => Stream[F,I] =13     mapChunks(_ filter f)14 15   /** Emits the first input  which matches the supplied predicate, to the output of the returned `Pull` */16   def find[F[_],I](f: I => Boolean): Stream[F,I] => Stream[F,I] =17     _ pull { h => Pull.find.flatMap { case o #: h => Pull.output1 }}18 19 20   /**21    * Folds all inputs using an initial value `z` and supplied binary operator,22    * and emits a single element stream.23    */24   def fold[F[_],I,O] => O): Stream[F,I] => Stream[F,O] =25     _ pull { h => Pull.fold.flatMap(Pull.output1) }26 ...27 /** Emits all elements of the input except the first one. */28   def tail[F[_],I]: Stream[F,I] => Stream[F,I] =29     drop(1)30 31   /** Emit the first `n` elements of the input `Handle` and return the new `Handle`. */32   def take[F[_],I]: Stream[F,I] => Stream[F,I] =33     _ pull Pull.take34 ...

 

 

贰 、流成分扩大了节组(chunk)类型和连锁的操作方法

 1 import scala.language.higherKinds
 2 def myFilter[F[_],A](f: A => Boolean): Pipe[F, A, A] = {
 3   def go(h: Stream.Handle[F,A]): Pull[F,A,Unit] =  {
 4 //      h.receive1 {case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)}
 5        h.await1.flatMap { case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)}
 6   }
 7 //  sin => sin.open.flatMap {h => go(h)}.close
 8   sin => sin.pull(go _)
 9 }                                   //> myFilter: [F[_], A](f: A => Boolean)fs2.Pipe[F,A,A]
10 
11 Stream.range(0,10).pure.through(myFilter(_ % 2 == 0)).toList
12                                      //> res17: List[Int] = List(0, 2, 4, 6, 8)

 

 

 

大家见到Pipe便是贰个Function1的门类小名,三个lambda:提供一个Stream[F,I],返回Stream[F,O]。那么在fs2里是哪些读取2个Stream[F,I]里的要素呢?我们前面提到是通过二个新的数据结构Pull来完成的,先来看望fs2是何等兑现Stream
>> Pull >> Stream转换的:

上面的例证里彰显了fs2的运算流从源头(Source)到传换(Transducer)一贯到巅峰(Sink)的选拔示范:

我们领略,纯数据流正是scalaz-stream里的Process1,即transducer,是负责对流举行状态转换的。在fs2里transducer正是Pipe(也是channel),大家一般用through来连接transducer。上面示范中的take,filter等都以transducer,大家能够在object
pipe里找到这几个函数:

 

 

 

壹 、完全不含任何外部重视(third-party
dependency)

 

 

捌 、Stream取代了Process。fs第22中学再没有Process一 、Tee、Wye、Channel那么些品种别称,取而代之的是:
  

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]

 

 

 

大家从Pull里用await1或许receive1把二个Step数据结构从Handle里扯(pull)出来然后再output到Pull结构里。把这一个Pull
close后得到大家需求的Stream。咱们把例子使用的品种及函数款式陈列在上面:

1 Stream()                       //> res0: fs2.Stream[Nothing,Nothing] = Segment(Emit(Chunk()))
2 Stream(1,2,3)                  //> res1: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))
3 Stream.emit(4)                 //> res2: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(4)))
4 Stream.emits(Seq(1,2,3))       //> res3: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))
 1 def stdOut: Sink[Task,String]  =
 2   _.evalMap { x => Task.delay{ println(s"milli: $x")}}
 3                                                   //> stdOut: => fs2.Sink[fs2.Task,String]
 4 Stream.repeatEval(Task.delay{System.currentTimeMillis})
 5   .map(_.toString)
 6   .through(myTake(3))
 7   .to(stdOut)
 8   .run.unsafeRun                                  //> milli: 1472001934708
 9                                                   //| milli: 1472001934714
10                                                   //| milli: 1472001934714

下边大家来看看fs2的局地基本操作:

 

 

在Pull的项目参数中F是三个运算,O代表输出元素类型,RAV4代表Pull里的多少能源。大家得以从Lacrosse读取成分。在上头的例子里pll的PAJERO值是个Handle类型。那些项目里应该提供了读取成分的法子:

fs2的一艺之长应该是八线程编程了。在Stream的类型款式中:Stream[F[_],A],F[_]是一种恐怕爆发副成效的运算形式,当F[_]等于Nothing时,Stream[Nothing,A]是一种纯数据流,而Stream[F[_],A]就是一种运算流了。我们得以在对运算流举市价况转换的经过中开始展览演算来落到实处F的副效能如:数据库读写、IO操作等。fs2不再绑定Task一种运算格局了。任何有Catchable实例的Monad都能够成为Stream的运算格局。不过,作为一种以八线程编制程序为着力的工具库,没有啥样运算方式会比Task更妥当了。
咱俩得以把四个纯数据流升格成运算流:

柒 、stream状态转换接纳了全新的落到实处格局,使用了新的数据结构:Pull

相关文章