Tematem obecnego tygodnia obecnego tygodnia było to co każdy powtarza sobie przed rozmową kwalifikacyjną czyli left i right joiny. Oczywiście całość na nowych, przebojowych i rewolucyjnych dataframe (datafrejmach,dataframeach,dataframesach?)
Laborki miały raczej mało wspólnego z wykładem bo zadaniem była analiza logów NASA przy pomocy RDD.
Same joiny są lepsze (chyba) od tych znanych z bazy danych bo zamiast nulli dostajemy Some i None
//Dane scala> val data1=List((1,"a"),(2,"b")) scala> val data2=List((1,"c"),(3,"d")) //rdd scala> val rdd1=sc.parallelize(data1) rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23 scala> val rdd2=sc.parallelize(data2) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at parallelize at <console>:23 //inner join rdd1.join(rdd2).collect() res2: Array[(Int, (String, String))] = Array((1,(a,c))) //left outer join scala> rdd1.leftOuterJoin(rdd2).collect() res4: Array[(Int, (String, Option[String]))] = Array((1,(a,Some(c))), (2,(b,None))) //right outer join scala> rdd1.rightOuterJoin(rdd2).collect() res3: Array[(Int, (Option[String], String))] = Array((1,(Some(a),c)), (3,(None,d)))
https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.PairRDDFunctions
Poprzednie tygodnie
Tutaj linki do poprzednich tygodni - podobno można jeszcze zacząć :
- Introduction to Big Data with Apache Spark - Week 1
- Introduction to Big Data with Apache Spark - Week 2
Uwagi i Wnioski
Pytanie czy zrobić to w formie suchych faktów czy ciekawego opowiadania - ciekawe opowiadanie jest ciekawe ale fakty są suche a podobno ma padać cały tydzień także :.
- To jest dobra strona do obadania operacji na RDD, który przechowuje (klucz, wartość) => Key-Value RDD
- Odczytywanie plików binarnych jest znacznie szybsze niż plików tekstowych - a scala jest generalnie szybsza od Pythona. I tu nie chodzi o to żeby coś hejtować ale według obrazka no generlanie jest szybsza :
- Jest taki zajebisty algorytm kompresji - https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)
- I ten LZ4 jest taki dobry, że jeszcze jeden link : https://code.google.com/p/lz4/
I taki bonus. Można zarejestrować własna funkcję, która zrobi jakieś customowe operacje na DataFrame :
val isEmployeFun = bcEmployees.value contains _ val isEmployee: UserDefinedFunction = sqlContext.udf.register("isEmployee", isEmployeFun) val filtered = ordered.filter(isEmployee($"login"))
I od razu drugi bonus - co to jest bcEmployees? To średnio czytelnie nazwana zmienna z ćwiczenia ale zmienna(a w zasadzie stała) nie zwyczajna gdyż rozdystrybuowana
val bcEmployees = sc.broadcast(employees) //a ten jednolinijkowiec to efekt lektury "functional programming in scala" //- tak, niektorych ludzi może zabić ale to tylko ćwiczenie val isEmployeFun = bcEmployees.value contains _
Laborki
Na laborkach trzeba było nauczyć się regular expressions - podejście numer 269.
val pattern="""^(\S+) (\S+)""".r val log="127.0.0.1 /GET" pattern findAllIn log next
Performance
Zestaw suchych faktów na tydzień bieżący :W zasadzie zawsze kiedy jest RDD typu (klucz,wartość) trzeba używać reduceByKey - wtedy mnie szuflowania po kablu jest.
A jak sie kilka razy używa jakiegoś RDD to trzba robić cache()
Ciekawe błędy
It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. - tutaj błąd polegał na tym, ze użyłem rdd.map(coś_tam_od_rdd) - a to co jest w map jest wysyłąne do workerów - czyli ogólnie bez sensu.
Testy przechodziły, ale po submicie zadania : Timeout error happened during grading. Please review your code to be more efficient and submit the code again.
Co tu można było zrobić :
- Generalnie chodzi o to aby używać specjalizowanych funkcji zamiast tych bardziej ogólnych co (znowu chyba) da Sparkowi hinta, które dane szufffffeling.
- reduceByKeys,sortByKeys,mapValues
- A jak się robi groupBy a później reduce to zazwyczaj jest to suboptymalne (tego nauczyłem się kiedyś w korpo - jak ktoś zjebie to mówi "we have reached a suboptimal state")
Brak komentarzy:
Prześlij komentarz