Tanulságok a Wikipédia Apache Spark alkalmazásával történő feldolgozása során

Az Apache Spark egy nyílt forráskódú hibatűrő fürtszámítási keretrendszer, amely támogatja az SQL elemzést, a gépi tanulást és a grafikonfeldolgozást is.

Úgy működik, hogy felosztja az adatait partíciókra, majd ezeket a partíciókat párhuzamosan dolgozza fel a fürt összes csomópontján. Ha bármelyik csomópont lemegy, akkor az adott csomópont feladatát áthelyezi egy másik csomópontra, és így hibatűrést biztosít.

Ha 100-szor gyorsabb, mint a Hadoop, rendkívül népszerűvé tette a Big Data feldolgozását. A Spark Scalában íródott és a JVM-en fut, de jó hír, hogy API-kat is biztosít Python és R, valamint C # számára. Jól dokumentált példákkal, amelyeket érdemes megnéznie.

Ha készen áll arra, hogy kipróbálja, ez a cikk a letöltéstől és a telepítéstől a teljesítményhangolásig vezet. Apró Spark klaszterem 100 millió húrmeccset adott elő a Wikipédia összes cikkén - kevesebb, mint két óra alatt.

Amikor túlhalad az oktatóanyagokon, és komoly munkát végez, akkor rájön a használt technikai verem összes problémájára. A hibák révén történő tanulás a legjobb módszer a tanuláshoz. De néha csak kevés az idő, és azt szeretné, ha tudna minden lehetséges dolgot, ami rosszul fordulhat elő.

Itt ismertetem azokat a problémákat, amelyekkel szembesültem, amikor a Spark-tal kezdtem, és hogyan kerülheted el ezeket.

Hogy kezdjed

Töltse le a Spark bináris csomagot, amely csomagolt Hadoop-függőségekkel jár

Ha nekilátott a Spark letöltésének, észreveszi, hogy ugyanazon verzióhoz különféle bináris fájlok állnak rendelkezésre. A Spark azt hirdeti, hogy nincs szüksége Hadoopra, ezért letöltheti a felhasználó által biztosított kisebb méretű verziót. Ne csináld .

Bár a Spark nem használja a Hadoop MapReduce keretrendszerét, függ más Hadoop könyvtáraktól, például a HDFS-től és a YARN-tól. A hadoop nélküli verzió akkor használható, amikor már máshol is rendelkezésre állnak Hadoop-könyvtárak.

Használja az önálló fürt módot, ne Mesos vagy YARN

Miután tesztelte a beépített példákat a localfürtön, és megbizonyosodott arról, hogy minden megfelelően van telepítve és megfelelően működik, folytassa a fürt beállításával.

A Spark három lehetőséget kínál: Mesos, YARN és önálló.

Az első kettő erőforrás-allokátor vezérli a replika csomópontokat. A Sparknak fel kell kérnie őket, hogy osszák meg saját példányait. Kezdőként ne növelje komplexitását azzal, hogy így jár.

Az önálló fürtöt a legkönnyebb beállítani. Ésszerű alapértelmezésekkel jár, például, ha az összes magját végrehajtóknak használja. Magának a Spark disztribúciónak a része, és van egy sbin/start-all.shszkriptje, amely fel tudja hozni az elsődleges, valamint conf/slavesaz ssh használatával felsorolt ​​összes másolatát .

A Mesos / YARN különálló programok, amelyeket akkor használnak, amikor a fürtje nem csupán egy szikrafürt. Ezenkívül nem járnak ésszerű alapértelmezett értékekkel: a végrehajtók nem használják minden magot a replikákon, hacsak kifejezetten nincs megadva.

A Zookeeper használatával lehetősége van magas rendelkezésre állású üzemmódra is, amely az elsődleges meghibásodások esetére az elsődleges biztonsági mentések listáját tartja. Ha Ön kezdő, akkor valószínűleg nem kezel egy ezer csomópontos fürtöt, ahol a csomópont meghibásodásának kockázata jelentős. Nagyobb valószínűséggel hoz létre fürtöt egy olyan felügyelt felhőalapú platformon, mint az Amazon vagy a Google, amely már gondoskodik a csomópontok meghibásodásáról.

Nem szükséges magas rendelkezésre állás felhőinfrastruktúrával vagy egy kis klaszterrel

A klaszteremet egy ellenséges környezetben állítottam fel, ahol az emberi tényezők voltak felelősek az áramkimaradásokért, és a csomópontok lemennek a hálózatról. (Alapvetően az egyetemi számítógépes laboratóriumom, ahol a szorgalmas hallgatók kikapcsolják a gépet, és a gondatlan hallgatók kihúzzák a LAN-kábeleket). Az elsődleges csomópont körültekintő megválasztásával továbbra is magas rendelkezésre állás nélkül tudtam elindulni. Nem kellene aggódnia emiatt.

Ellenőrizze a Spark futtatásához használt Java verziót

Az egyik nagyon fontos szempont a Java verzió, amelyet a Spark futtatásához használ. Normális esetben a Java későbbi verziója működik valamivel, amelyet a régebbi kiadásokhoz fordítottak össze.

De a Project Jigsaw segítségével a modularitás szigorúbb elszigeteltséget és határokat vezetett be a Java 9-ben, ami megtör bizonyos dolgokat, amelyek reflektálást használnak. A Java 9-en futó Spark 2.3.0-on illegális reflexió-hozzáférést kaptam. A Java 8-nak nem volt problémája.

Ez a közeljövőben mindenképpen megváltozik, de ezt addig tartsa szem előtt.

Pontosan adja meg az elsődleges URL-t úgy, ahogy van. Ne oldja meg a tartományneveket IP-címekké vagy fordítva

Az önálló fürt nagyon érzékeny az elsődleges és a replika csomópontok feloldására használt URL-ekre. Tegyük fel, hogy az elsődleges csomópontot az alábbiak szerint indítja:

> sbin/start-master.sh 

és az elsődleges ideje fent van localhost:8080

Alapértelmezés szerint a számítógép gazdagépnevét választják elsődleges URL-címként. x360úgy határoz, hogy localhostcsak kiindulási replika, mint alább nem működik.

# does not work > sbin/start-slave.sh spark://localhost:7077 
# works > sbin/start-slave.sh spark://x360:7077

Ez működik, és a replikánk hozzá lett adva a fürthöz:

A replikánknak van egy IP címe a 172.17.xx aldomainben, ami valójában a Docker által a gépemen beállított aldomain.

Az elsődleges azért tud kommunikálni ezzel a replikával, mert mindkettő ugyanazon a gépen van. De a replika nem tud kommunikálni a hálózat más másolataival, vagy egy másik gép elsődleges változatával, mert az IP-címe nem irányítható.

A fenti elsődleges esethez hasonlóan az elsődleges nélküli gép másolata felveszi a gép gazdagépnevét. Ha azonos gépei vannak, mindegyik végül ugyanazt a hosztnevet használja, mint a címük. Ez totális rendetlenséget okoz, és senki sem kommunikálhat a másikkal.

Tehát a fenti parancsok a következőkre változnának:

# start master> sbin/start-master.sh -h $myIP # start slave > sbin/start-slave.sh -h $myIP spark://:7077 # submit a job > SPARK_LOCAL_IP=$myIP bin/spark-submit ...

hol myIPvan a gép IP-címe, amely a fürt csomópontjai között irányítható. Valószínűbb, hogy az összes csomópont ugyanazon a hálózaton van, így írhat egy parancsfájlt, amely myIPminden gépen beállítódik .

# assume all nodes in the 10.1.26.x subdomain [email protected]:~$ myIP=`hostname -I | tr " " "\n" | grep 10.1.26. | head`

A kód folyamata

So far we have set up our cluster and seen that it is functional. Now its time to code. Spark is quite well-documented and comes with lots of examples, so its very easy to get started with coding. What is less obvious is how the whole thing works which results in some very hard to debug errors during runtime. Suppose you coded something like this:

class SomeClass { static SparkSession spark; static LongAccumulator numSentences; 
 public static void main(String[] args) { spark = SparkSession.builder() .appName("Sparkl") .getOrCreate(); (1) numSentences = spark.sparkContext() .longAccumulator("sentences"); (2) spark.read() .textFile(args[0]) .foreach(SomeClass::countSentences); (3) } static void countSentences(String s) { numSentences.add(1); } (4) }

1 create a spark session

2 create a long counter to keep track of job progress

3 traverse a file line by line calling countSentences for each line

4 add 1 to the accumulator for each sentence

The above code works on a local cluster but will fail with a null pointer exception when run on a multinode cluster. Both spark as well as numSentences will be null on the replica machine.

To solve this problem, encapsulate all initialized states in non-static fields of an object. Use main to create the object and defer further processing to it.

What you need to understand is that the code you write is run by the driver node exactly as is, but what the replica nodes execute is a serialized job that spark gives them. Your classes will be loaded by the JVM on the replica.

Static initializers will run as expected, but functions like main won’t, so static values initialized in the driver won’t be seen in the replica. I am not sure how the whole thing works, and am only inferring from experience, so take my explanation with a grain of salt. So your code now looks like:

class SomeClass { SparkSession spark; (1) LongAccumulator numSentences; String[] args; SomeClass(String[] args) { this.args = args; } public static void main(String[] args){ new SomeClass(args).process(); (2) } void process() { spark = SparkSession.builder().appName("Sparkl").getOrCreate(); numSentences = spark.sparkContext().longAccumulator("sentences"); spark.read().textFile(args[0]).foreach(this::countSentences); (3) } void countSentences(String s) { numSentences.add(1); }}

1 Make fields non static

2 create instance of the class and then execute spark jobs

3 reference to this in the foreach lambda brings the object in the closure of accessible objects and thus gets serialized and sent to all replicas.

Those of you who are programming in Scala might use Scala objects which are singleton classes and hence may never come across this problem. Nevertheless, it is something you should know.

Submit app and dependencies

There is more to coding above, but before that you need to submit your application to the cluster. Unless your app is extremely trivial, chances are you are using external libraries.

When you submit your app jar, you also need to tell Spark the dependent libraries that you are using, so it will make them available on all nodes. It is pretty straightforward. The syntax is:

bin/spark-submit --packages groupId:artifactId:version,...

I have had no issues with this scheme. It works flawlessly. I generally develop on my laptop and then submit jobs from a node on the cluster. So I need to transfer the app and its dependencies to whatever node I ssh into.

Spark looks for dependencies in the local maven repo, then the central repo and any repos you specify using --repositories option. It is a little cumbersome to sync all that on the driver and then type out all those dependencies on the command line. So I prefer all dependencies packaged in a single jar, called an uber jar.

Use Maven shade plugin to generate an uber jar with all dependencies so job submitting becomes easier

Just include the following lines in your pom.xml

   org.apache.maven.plugins maven-shade-plugin  shade      

When you build and package your project, the default distribution jar will have all dependencies included.

As you submit jobs, the application jars get accumulated in the work directory and fill up over time.

Set spark.worker.cleanup.enabled to true in conf/spark-defaults.conf

This option is false by default and is applicable to the stand-alone mode.

Input and Output files

This was the most confusing part that was difficult to diagnose.

Spark supports reading/writing of various sources such as hdfs, ftp, jdbc or local files on the system when the protocol is file:// or missing. My first attempt was to read from a file on my driver. I assumed that the driver would read the file, turn it into partitions, and then distribute those across the cluster. Turns out it doesn’t work that way.

When you read a file from the local filesystem, ensure that the file is present on all the worker nodes at exactly the same location. Spark does not implicitly distribute files from the driver to the workers.

So I had to copy the file to every worker at the same location. The location of the file was passed as an argument to my app. Since the file was located in the parent folder, I specified its path as ../wikiArticles.txt. This did not work on the worker nodes.

Always pass absolute file paths for reading

It could be a mistake from my side, but I know that the filepath made it as is into the textFile function and it caused “file not found” errors.

Spark supports common compression schemes, so most gzipped or bzipped text files will be uncompressed before use. It might seem that compressed files will be more efficient, but do not fall for that trap.

Don’t read from compressed text files, especially gzip. Uncompressed files are faster to process.

Gzip cannot be uncompressed in parallel like bzip2, so nodes spend the bulk of their time uncompressing large files.

It is a hassle to make the input files available on all workers. You can instead use Spark’s file broadcast mechanism. When submitting a job, specify a comma separated list of input files with the --files option. Accessing these files requires SparkFiles.get(filename). I could not find enough documentation on this feature.

To read a file broadcasted with the --files option, use SparkFiles.get(h>) as the pathname in read functions.

So a file submitted as --files /opt/data/wikiAbstracts.txt would be accesed as SparkFiles.get("WikiAbstracts.txt"). This returns a string which you can use in any read function that expects a path. Again, remember to specify absolute paths.

Since my input file was 5GB gzipped, and my network was quite slow at 12MB/s, I tried to use Spark’s file broadcast feature. But the decompression itself was taking so long that I manually copied the file to every worker. If your network is fast enough, you can use uncompressed files. Or alternatively, use HDFS or FTP server.

Writing files also follows the semantics of reading. I was saving my DataFrame to a csv file on the local system. Again I had the assumption that the results would be sent back to the driver node. Didn’t work for me.

When a DataFrame is saved to local file path, each worker saves its computed partitions to its own disk. No data is sent back to the driver

I was only getting a fraction of the results I was expecting. Initially I had misdiagnosed this problem as an error in my code. Later I found out that each worker was storing its computed results on its own disk.

Partitions

The number of partitions you make affects the performance. By default, Spark will make as many partitions as there are cores in the cluster. This is not always optimal.

Keep an eye on how many workers are actively processing tasks. If too few, increase the number of partitions.

If you read from a gzipped file, Spark creates just one partition which will be processed by only one worker. That is also one reason why gzipped files are slow to process. I have observed slower performance with small number of large partitions as compared to a large number of small partitions.

It’s better to explicitly set the number of partitions while reading data.

You may not have to do this when reading from HDFS, as Hadoop files are already partitioned.

Wikipedia and DBpedia

There are no gotchas here, but I thought it would be good to make you aware of alternatives. The entire Wikipedia xml dump is 14GB compressed and 65 GB uncompressed. Most of the time you only want the plain text of the article, but the dump is in MediaWiki markup so it needs some preprocessing. There are many tools available for this in various languages. Although I haven’t used them personally, I am pretty sure it must be a time consuming task. But there are alternatives.

If all you want is the Wikipedia article plaintext, mostly for NLP, then download the dataset made available by DBpedia.

I used the full article dump (NIF Context) available at DBpedia (direct download from here). This dataset gets rid of unwanted stuff like tables, infoboxes, and references. The compressed download is 4.3GB in the turtle format. You can covert it to tsv like so

Similar datasets are available for other properties like page links, anchor texts, and so on. Do check out DBpedia.

A word about databases

I never quite understood why there is a plethora of databases, all so similar, and on top of that people buy database licenses. Until this project I hadn’t seriously used any. I ever only used MySQL and Apache Derby.

For my project I used a SPARQL triple store database, Apache Jena TDB, accessed over a REST API served by Jena Fuseki. This database would give me RDF urls, labels, and predicates for all the resources mentioned in the supplied article. Every node would make a database call and only then would proceed with further processing.

My workload had become IO bound, as I could see near 0% CPU utilization on worker nodes. Each partition of the data would result in two SPARQL queries. In the worst case scenario, one of the two queries was taking 500–1000 seconds to process. Thankfully, the TDB database relies on Linux’s memory mapping. I could map the whole DB into RAM and significantly improve performance.

If you are IO bound and your database can fit into RAM, run it in memory.

I found a tool called vmtouch which would show what percentage of the database directory had been mapped into memory. This tool also allows you to explicitly map any files/directories into the RAM and optionally lock it so it wont get paged out.

My 16GB database could easily fit into my 32 GB RAM server. This boosted query performance by orders of magnitude to 1–2 seconds per query. Using a rudimentary form of database load balancing based on partition number, I could cut down my execution time to half by using 2 SPARQL servers instead of one.

Conclusion

I truly enjoyed distributed computing on Spark. Without it I could not have completed my project. It was quite easy to take my existing app and have it run on Spark. I definitely would recommend anyone to give it a try.

Originally published at siddheshrane.github.io.

Original text