środa, 17 czerwca 2015

Introduction to Big Data with Apache Spark - Week 3

Tematem obecnego tygodnia obecnego tygodnia było to co każdy powtarza sobie przed rozmową kwalifikacyjną czyli left i right joiny. Oczywiście całość na nowych, przebojowych i rewolucyjnych dataframe (datafrejmach,dataframeach,dataframesach?)

Laborki miały raczej mało wspólnego z wykładem bo zadaniem była analiza logów NASA przy pomocy RDD.

Same joiny są lepsze (chyba) od tych znanych z bazy danych bo zamiast nulli dostajemy Some i None

//Dane
scala> val data1=List((1,"a"),(2,"b"))
scala> val data2=List((1,"c"),(3,"d"))

//rdd
scala> val rdd1=sc.parallelize(data1)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23


scala> val rdd2=sc.parallelize(data2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at parallelize at <console>:23


//inner join
rdd1.join(rdd2).collect()
res2: Array[(Int, (String, String))] = Array((1,(a,c)))

//left outer join
scala> rdd1.leftOuterJoin(rdd2).collect()
res4: Array[(Int, (String, Option[String]))] = Array((1,(a,Some(c))), (2,(b,None)))

//right outer join
scala> rdd1.rightOuterJoin(rdd2).collect()
res3: Array[(Int, (Option[String], String))] = Array((1,(Some(a),c)), (3,(None,d)))

https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.PairRDDFunctions

Poprzednie tygodnie

Tutaj linki do poprzednich tygodni - podobno można jeszcze zacząć :

Uwagi i Wnioski

Pytanie czy zrobić to w formie suchych faktów czy ciekawego opowiadania - ciekawe opowiadanie jest ciekawe ale fakty są suche a podobno ma padać cały tydzień także :.

  • To jest dobra strona do obadania operacji na RDD, który przechowuje (klucz, wartość) => Key-Value RDD
  • Odczytywanie plików binarnych jest znacznie szybsze niż plików tekstowych - a scala jest generalnie szybsza od Pythona. I tu nie chodzi o to żeby coś hejtować ale według obrazka no generlanie jest szybsza :
  • Jest taki zajebisty algorytm kompresji - https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)
  • I ten LZ4 jest taki dobry, że jeszcze jeden link : https://code.google.com/p/lz4/

I taki bonus. Można zarejestrować własna funkcję, która zrobi jakieś customowe operacje na DataFrame :

val isEmployeFun = bcEmployees.value contains _

val isEmployee: UserDefinedFunction = sqlContext.udf.register("isEmployee", isEmployeFun)

val filtered = ordered.filter(isEmployee($"login"))


I od razu drugi bonus - co to jest bcEmployees? To średnio czytelnie nazwana zmienna z ćwiczenia ale zmienna(a w zasadzie stała) nie zwyczajna gdyż rozdystrybuowana
val bcEmployees = sc.broadcast(employees)

//a ten jednolinijkowiec to efekt lektury "functional programming in scala" 
//- tak, niektorych ludzi może zabić ale to tylko ćwiczenie
val isEmployeFun = bcEmployees.value contains _

Laborki

Na laborkach trzeba było nauczyć się regular expressions - podejście numer 269.

val pattern="""^(\S+) (\S+)""".r
val log="127.0.0.1 /GET"
pattern findAllIn log next

Performance

Zestaw suchych faktów na tydzień bieżący :

W zasadzie zawsze kiedy jest RDD typu (klucz,wartość) trzeba używać reduceByKey - wtedy mnie szuflowania po kablu jest.

A jak sie kilka razy używa jakiegoś RDD to trzba robić cache()

Ciekawe błędy

It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. - tutaj błąd polegał na tym, ze użyłem rdd.map(coś_tam_od_rdd) - a to co jest w map jest wysyłąne do workerów - czyli ogólnie bez sensu.

Testy przechodziły, ale po submicie zadania : Timeout error happened during grading. Please review your code to be more efficient and submit the code again.
Co tu można było zrobić :

  • Generalnie chodzi o to aby używać specjalizowanych funkcji zamiast tych bardziej ogólnych co (znowu chyba) da Sparkowi hinta, które dane szufffffeling.
  • reduceByKeys,sortByKeys,mapValues
  • A jak się robi groupBy a później reduce to zazwyczaj jest to suboptymalne (tego nauczyłem się kiedyś w korpo - jak ktoś zjebie to mówi "we have reached a suboptimal state")

Brak komentarzy:

Prześlij komentarz