czwartek, 11 czerwca 2015

Introduction to Big Data with Apache Spark - Week 2

Tydzień pierwszy opisany tutaj ~~~~> http://pawelwlodarski.blogspot.com/2015/06/introduction-to-big-data-with-apache.html

W końcu pojawiła się jakaś definicja BigData, którą rozumiem. BigData to nie jest wtedy jak ma się XBajtów ale wtedy jak "Ilość danych rośnie szybciej niż zdolność obliczeniowa jednostki"

Lekcja 3 - jak powstał spark

Grupa naukowców pracowała z hadoopem i zaczęła się zastanawiać dlaczego każda iteracja w pętli for zapisuje wynik na dysku sieciowym. Po chwili doszli do wniosku, ze skoro są profesorami to napiszą coś fajnego, wzięli scalę, tez stworzoną i przez profesora i zrobili prawdziwie profesorskie narzędzie.

Spark ma zastąpić hadoopa ale i tak książka "Hadoop in action jest planowana na przełom tego roku" - (mam nadzieję, że to nie będzie utopione 37 złotych.)

Papiery naukowe,Prace naukowe, white papers...

W kursie znalazły się dwa linki do prac naukowych o Sparku i RDD. Fajnie poczytać czasem prace naukowe gdyż są dokładne i ciekawe.

Link pierwszy : http://usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf

Opisuje RDD jako takie

  • Spark przy algorytmach iteracyjnych działa trzynaście trylionów razy szybciej niż hadoop
  • Jest opisane jak dokładnie działa RDD, że jest lazy i co to daje, że łatwo odtworzyć w razie awarii i co to daje

Pierwszy cytat :

"Spark is up to 20× faster than Hadoop for iterative applications, speeds up a real-world data analytics report by 40×, and can be used interactively to scan a 1 TB dataset with 5–7s latency"

"Finally, users can control two other aspects of RDDs: persistence and partitioning."

I chyba w tym momencie zrozumiałem ogromną zaletę tego narzędzia.

"Users can indicate which RDDs they will reuse and choose a storage strategy for them (e.g., in-memory storage). They can also ask that an RDD’s elements be partitioned across machines based on a key in each record. This is useful for placement optimizations, such as ensuring that two datasets that will be joined together are hash-partitioned in the same way."

Link drugi : http://people.csail.mit.edu/matei/papers/2010/hotcloud_spark.pdf

Opisuje Sparka jako implementację RDD.

"We have implemented Spark in about 14,000 lines of Scala."

First law of software quality: e=mc^2, errors = (more code)^2 - wiele osób błędnie chwali się, że ich projekt jest wielki (to akurat prawda ale sami są tego winni) i przez to "bardziej profesjonalny" - tak jak bardziej kręte drogi są bardzo profesjonalne.

""The cache action leaves the dataset lazy, but hints that it should be kept in memory after the first time it is computed, because it will be reused"

"The save action evaluates the dataset and writes it to a distributed filesystem such as HDFS. The saved version is used in future operations on it"

A te dwa były o tym, że spark jest lazy i jak to tam działa.

Komunikacja master<--> worker

Są dwa Fajne typy zmiennych :

Broadcast variables

Z tego co zrozumiałem funkcje operujące na RDD są za każdym razem wysyłane do workera i czasem jakaś zmienna może się zaplątać w closure. A ten tutaj wynalazek sprawia, że zostanie wysłana raz tylko jeden.
val broadcastVar=sc.broadcast(7)
broadcastVar: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(12)

scala> val rdd=sc.parallelize(Seq(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21


scala> rdd.map(_+1).collect
res16: Array[Int] = Array(2, 3, 4, 5)


scala> rdd.map(_+broadcastVar.value).collect
res18: Array[Int] = Array(8, 9, 10, 11)

Accumulators

O ile lambda leci w kierunku driver --> worker to accumulator lecu worker --> driver. Albo inaczej worker to widzi jako write-only a driver jako read-only.
scala> val acc=sc.accumulator(0)
acc: org.apache.spark.Accumulator[Int] = 0

scala> rdd.foreach(acc+=_)

scala> acc.value
res20: Int = 10
I ponieważ transformacje mogą być wykonywane wielokrotnie bo albo coś padnie albo działa za wolno - to akumulatorów tam nie można stosować - jedynie w akcjach.

Kiedy keszować

val lines=sc.textFile
val filtered=lines.filter()
lines.count()
filtered.count()

I powyżej lines będzie ewaluowane dwa razy - ale w przykładzie poniżej już nie!:

val lines=sc.textFile
lines.cache
val filtered=lines.filter()
lines.count()
filtered.count()

Code quality

I tutaj taka ciekawa rzecz :

# Define a function to filter a single value
def ten(value):
    if (value < 10):
        return True
    else:
        return False
# The ten function could also be written concisely as: def ten(value): return value < 10

# Pass the function ten to the filter transformation
# Filter is a transformation so no tasks are run
filteredRDD = subRDD.filter(ten)
To jest kod z przykładów na kursie. I to jest bez sensu bo if(cond) true else false można zwyczajnie zastąpić cond - no to widać, ze profesor nie profesor - sonara trzeba włączyć (chociaż pewnie i tak dali do przygotowania te ćwiczenia jakiemuś laborantowi)

Laborki

Bardzo proszą aby nie publikować zadań to nie będę. Takie ciekawe fakty tylko

  • SPARK UI - http://localhost:4040/jobs/
  • http://localhost:4040/storage/ - tutaj widać wszystkie keszowane i persistowane rdd

Można nazywać sobie RDD by było łatwiej skumać w logach co się dzieje

scala> rdd.setName("zbior ku*wa")
res28: rdd.type = zbior ku*wa ParallelCollectionRDD[0] at parallelize at <console>:21

scala> rdd.toDebugString
res30: String = 
(1) zbior ku*wa ParallelCollectionRDD[0] at parallelize at <console>:21 [Memory Deserialized 1x Replicated]
 |       CachedPartitions: 1; MemorySize: 96.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B

Pattern Matching na tupla w Pythonie

Można tak :

In [3]: tup=(1,'a')
lam=lambda (x,y): y
In [8]: lam(tup)
Out[8]: 'a'

Brak komentarzy:

Prześlij komentarz