Streaming könyvtár nagyhatalommal: FS2 és funkcionális programozás

A Scala rendelkezik egy nagyon különleges FS2 (Funkcionális adatfolyamok a Scala számára) nevű streaming könyvtárral. Ez a könyvtár a funkcionális programozás (FP) összes előnyét testesíti meg. A tervezési célok megértésével megismerheti azokat az alapvető ötleteket, amelyek az FP-t annyira vonzóvá teszik.

Az FS2-nek egy központi típusa van: Stream[Effect,Output]

Lehet, hogy ettől a típustól azt kapja, hogy ez egy, Streamés hogy típusú értékeket bocsát ki Output.

A nyilvánvaló kérdés itt az, hogy mi van Effect? Mi a kapcsolat a Effectés között Output? És milyen előnyei vannak az FS2-nek más streaming könyvtárakkal szemben?

Áttekintés

Először áttekintem, hogy az FS2 milyen problémákat old meg. Ezután összehasonlítom Listés Streamszámos kód példával. Ezt követően arra fogok összpontosítani, hogy hogyan lehet használni Streamegy DB-vel vagy bármely más IO-val. Itt ragyog az FS2, és ahol a Effecttípust használják. Miután megértette, mi Effectvan, a funkcionális programozás előnyeinek nyilvánvalónak kell lennie az Ön számára.

A bejegyzés végén a következő kérdésekre kap választ:

  • Milyen problémákat tudok megoldani az FS2-vel?
  • Mit tehetek azzal, Streamhogy Listnem?
  • Hogyan adhatok adatokat egy API-ból / File / DB-ből a-ba Stream?
  • Mi ez a Effecttípus és hogyan kapcsolódik a funkcionális programozáshoz?

Megjegyzés: A kód Scalában található, és érthetőnek kell lennie a szintaxis előzetes ismerete nélkül is.

Milyen problémákat tudok megoldani az FS2-vel?

  1. Streaming I / O: Inkrementálisan nagy adathalmazok betöltése, amelyek nem férnek el a memóriában, és azok működtetése a halom fújása nélkül.
  2. Vezérlési folyamat (nincs lefedve): Az adatok egy / több DB-ből / fájlból / API-ból történő mozgatása szép, deklaratív módon.
  3. Párhuzamosság (nincs lefedve): Futtasson párhuzamosan különböző folyamokat, és késztesse őket egymásra. Például több fájlból tölthet fel adatokat, és egyidejűleg dolgozhatja fel őket, szemben a szekvenciális adatokkal. Itt fejlett dolgokat tehet. Streams képesek egymással kommunikálni során a feldolgozási fázisban, és nem csak a végén.

List vs. Stream

Lista legismertebb és leggyakrabban használt adatstruktúra. Ahhoz, hogy megérezzük, miben különbözik az FS2-től Stream, át kell néznünk néhány felhasználási esetet. Meglátjuk, hogyan Streamlehet megoldani azokat a problémákat, Listamelyek nem.

Adatai túl nagyok, és nem férnek el a memóriában

Tegyük fel, hogy nagyon nagy fájlja van (40 GB) fahrenheit.txt. A fájlnak minden sorában van egy hőmérséklete, amelyet konvertálni szeretne celsius.txt.

Nagy fájl betöltése a List

import scala.io.Source val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList java.lang.OutOfMemoryError: Java heap space java.util.Arrays.copyOfRange(Arrays.java:3664) java.lang.String.(String.java:207) java.io.BufferedReader.readLine(BufferedReader.java:356) java.io.BufferedReader.readLine(BufferedReader.java:389)

Listcsúnyán elbukik, mert természetesen a fájl túl nagy ahhoz, hogy elférjen a memóriában. Ha kíváncsi vagy, itt megnézheted a teljes megoldást Stream- de később tedd, olvass tovább :)

Amikor a List nem fog sikerülni ... Patakként segíts!

Tegyük fel, hogy sikerült elolvasnom a fájlomat, és vissza akarom írni. Szeretném megőrizni a vonalszerkezetet. \nMinden hőmérséklet után új vonalas karaktert kell beillesztenem .

intersperseEhhez használhatom a kombinátort

import fs2._ Stream(1,2,3,4).intersperse("\n").toList

Egy másik kedves zipWithNext

scala> Stream(1,2,3,4).zipWithNext.toList res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))

Az egymást követő dolgokat együtt csomagolja, nagyon hasznos, ha el akarja távolítani az egymást követő duplikátumokat.

Ez csak néhány a nagyon hasznosak közül, itt van a teljes lista.

Nyilvánvalóan Streamsok mindent Listmegtehet , ami nem lehetséges, de a legjobb funkció a következő szakaszban jelenik meg, ez arról szól, hogy hogyan lehet Streama való világban használni a DB-ket / fájlokat / API-kat ...

Hogyan adhatok adatokat egy API-ból / File / DB-ből a-ba Stream?

Mondjuk most, hogy ez a programunk

scala> Stream(1,2,3) res2: fs2.Stream[fs2.Pure,Int] = Stream(..)

Mit jelent ez Pure? Itt van a scaladoc a forráskódból:

/** * Indicates that a stream evaluates no effects. * * A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`. */ type Pure[A] <: Nothing

Ez azt jelenti, hogy nincs hatás, ok…, de mi a hatás? és pontosabban mi a programunk hatása Stream(1,2,3)?

Ez a program szó szerint nincs hatással a világra. Az egyetlen hatása az lesz, hogy a CPU működőképes és fogyaszt némi energiát !! Nem érinti a körülötted lévő világot.

A világ befolyásolásán azt értem, hogy bármilyen értelmes erőforrást felemészt, például egy fájlt, egy adatbázist, vagy bármi hasonlót állít elő , mint egy fájl, feltölt néhány adatot valahova, ír a termináljára stb.

Hogyan lehet az Pureadatfolyamot hasznosra fordítani ?

Tegyük fel, hogy a felhasználói azonosítókat egy DB-ről szeretném betölteni, megkapom ezt a függvényt, meghívja a DB-t és a userId-t adja vissza a-ként Long.

import scala.concurrent.Future def loadUserIdByName(userName: String): Future[Long] = ???

Visszaadja a-t, Futureamely azt jelzi, hogy ez a hívás aszinkron, és az érték a jövőben egy bizonyos pillanatban elérhető lesz. Becsomagolja a DB által visszaadott értéket.

Megvan ez a Purepatak.

scala> val names = Stream("bob", "alice", "joe") names: fs2.Stream[fs2.Pure,String] = Stream(..)

Hogyan szerezhetek egy Streamazonosítót?

A naiv megközelítés a mapfüggvény használata lenne , a függvényt az egyes értékekhez kell futtatnia Stream.

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

Még mindig visszakaptam a Pure! Adtam Streamegy olyan funkciót, amely hatással van a világra, és még mindig kaptam egy Pure, nem klassz ... Szép lett volna, ha az FS2 automatikusan észleli, hogy a loadUserIdByNamefunkció hatással van a világra, és adott nekem valamit, ami NEM, Purede mégis nem így működik. Speciális kombinátort kell használnia, nem pedig map: használni kell evalMap.

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Nincs többé Pure! kaptunk Futurehelyette, igen! Mi történt?

Tartott:

  • loadUserIdByName: Future[Long]
  • Stream[Pure, String]

És átkapcsolta a folyam típusait

  • Stream[Future, Long]

Elválasztotta Futureés elszigetelte! A bal oldali Effecttípusparaméter most a betontípus Future.

Szép trükk, de hogyan segít nekem?

Most tanúja voltál az aggodalmak valódi elválasztásának. Folytathatja az adatfolyam működését az összes olyan kedves Listkombinátorral, és nem kell aggódnia, ha a DB nem működik, lassú vagy a hálózati (effektus) problémákkal kapcsolatos dolgok.

Mindez addig működik, amíg nem akarom felhasználni toListaz értékek visszaszerzésére

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Mit???!!! Esküszhetnék arra, hogy toListkorábban használtam , és bevált, hogyan mondhatnám, hogy toListmár nem tagja fs2.Stream[Future,String]? Olyan, mintha ezt a funkciót abban a pillanatban eltávolították volna, amikor elkezdtem használni az effekt-teljes adatfolyamot. De hogyan hozhatom vissza az értékeimet?

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

Először használjuk compile, hogy elmondja az Streamkombinálni az összes hatást egyetlen, ténylegesen kihajtódás összes hívás loadUserIdByNameegyetlen nagy Future. Erre szükség van a keretrendszer számára, és hamarosan kiderül, miért van erre szükség.

Most toListműködnie kell

scala> userIdsFromDB.compile.toList :18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future] userIdsFromDB.compile.toList ^

Mit?! a fordító továbbra is panaszkodik. Ez azért Futurevan, mert ez nem jó Effecttípus - ez megtöri az aggodalmak elkülönítésének filozófiáját, amint azt a következő nagyon fontos szakasz elmagyarázza.

FONTOS: Az egyetlen dolog, amit el kell venni erről a bejegyzésről

Kulcsfontosságú szempont itt az, hogy a DB-t még nem hívták meg. Semmi nem történt igazán, a teljes program nem hoz semmit.

def loadUserIdByName(userName: String): Future[Long] = ??? Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile

A programleírás elkülönítése az értékeléstől

Igen, meglepő lehet, de az FP fő témája a

  • Description of your program: a good example is the program we just wrote, it’s a pure description of the problem “I give you names and a DB, give me back IDs”

And the

  • Execution of your program: running the actual code and asking it to go to the DB

One more time our program has literally no effect on the world besides making your computer warm, exactly like our Pure stream.

Code that does not have an effect is called pure and that’s what all Functional Programming is about: writing programs with functions that are pure. Bravo, you now know what FP is all about.

Why would you want write code this way? Simple: to achieve separation of concerns between the IO parts and the rest of our code.

Now let’s fix our program and take care of this Future problem.

As we said Future is a bad Effect type, it goes against the separation of concerns principle. Indeed, Future is eager in Scala: the moment you create one it starts to executes on some thread, you don't have control of the execution and thus it breaks. FS2 is well aware of that and does not let you compile. To fix this we have to use a type called IO that wraps our bad Future.

That brings us to the last part, what is this IO type? and how do I finally get my list of usedIds back?

scala> import cats.effect.IO import cats.effect.IO scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList res8: cats.effect.IO[List[Long]] = IO$2104439279

It now gives us back a List but still, we didn't get our IDs back, so one last thing must be missing.

What does IO really mean?

IO comes from cats-effect library. First let's finish our program and finally get out the ids back from the DB.

scala> userIds.compile.toList.unsafeRunSync :18: error: not found: value userIds userIds.compile.toList.unsafeRunSync ^

The proof that it’s doing something is the fact that it’s failing.

loadUserIdByName(userName: String): Future[Long] = ???

When ??? is called you will get this exception, it means the function was executed (as opposed to before when we made the point that nothing was really happening). When we implement this function it will go to the DB and load the ids, and it will have an effect on the world (network/files system).

IO[Long] is a description of how to get a value of type Long and it most certainly involves doing some I/O i.e going to the network, loading a file,...

It’s the How and not the What. It describes how to get the value from the network. If you want to execute this description, you can use unsafeRunSync (or other functions prefixed unsafe). You can guess why they are called this way: indeed a call to a DB is inherently unsafe as it could fail if, for example, your Internet connection is out.

Recap

Let’s take a last look at Stream[Effect,Output].

Output is the type that the stream emits (could be a stream of String, Long or whatever type you defined).

Effect is the way (the recipe) to produce the Output (i.e go to the DB and give me an id of type Long).

It’s important to understand that if these types are separated to make things easier, breaking down a problem in subproblems allows you to reason about the subproblems independently. You can then solve them and combine their solutions.

The link between these 2 types is the following :

In order for the Stream to emit an element of type

  • Output

It needs to evaluate a type

  • Effect

A special type that encodes an effective action as a value of type IO, this IO value allows the separation of 2 concerns:

  • Description:IO is a simple immutable value, it’s a recipe to get a type A by doing some kind of IO(network/filesystem/…)
  • Execution: in order forIO to do something, you need to execute/run it using io.unsafeRunSync

Putting it all together

Stream[IO,Long] says:

This is a Stream that emits values of type Long and in order to do so, it needs to run an effective function that producesIO[Long] for each value.

That’s a lot of details packed in this very short type. The more details you get about how things happen the fewer errors you make.

Takeaways

  • Stream is a super charged version of List
  • Stream(1,2,3) is of type Stream[Pure, Int] , the second type Int is the type of all values that this stream will emit
  • Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
  • Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream.
  • Stream[IO, Long] separates the concerns of What and How by letting you work only with the values and not worrying about how to get them (loading from the db).
  • Separating program description from evaluation is a key aspect of FP.
  • All the programs you write with Stream will do nothing until you use unsafeRunSync. Before that your code is effectively pure.
  • IO[Long] is an effect type that tells you: you will get Long values from IO (could be a file, the network, the console ...). It's a description and not a wrapper!r
  • Future does not abide by this philosophy and thus is not compatible with FS2, you have to use IO type instead.

FS2 videos

  • Hands on screencast by Michael Pilquist: //www.youtube.com/watch?v=B1wb4fIdtn4
  • Talk by Fabio Labella //www.youtube.com/watch?v=x3GLwl1FxcA