Poniżej materiał do warsztatów uczących używania mechanizmów wielowątkowych dostępnych w Javie aleee programować będziemy w Scali. Kiedy się odbędzie warsztat? Warsztat odbędzie się w momencie jego dobycia. Pewnie w przyszłym miesiącu.
Celem warsztaty będzie zapoznanie uczestników (którzy w założeniu nie stykają się z problemami wielowątkowości na co dzień) z ewolucją którą przeszła Java do wersji 1.7 - (java.util.concurrent, fork-join i takie tam.) Jednocześnie nie zarzucając słuchaczy zbyt wielką listą szczegółów.
Aby upiec dwie pieczenie na jednym ruszcie - będziemy przy okazji uczyć się Scali. To trochę jak nauka angielskiego poprzez tłumaczenie najnowszego wydania "Świata Dorsza" na tenże język.
Na warsztat przygotuję jakieś próbki kodu tak aby uczestnicy nie musieli pisać wszystkiego od początku. Kod będzie kompletny np. w 70% i nauka odbywać będzie się poprzez uzupełnienie brakujących fragmentów z towarzyszącym niezwykle krytycznym procesem wyciągania wniosków.
Po co uczyć się "starych" mechanizmów wielowątkowości jeśli ostatnio różne nowe rozwiązania działają na wyższym poziomie abstrakcji? No po to aby wiedzieć dlaczego one działają na wyższym poziomie abstrakcji i dlaczego w ogóle działają.
W jakieś wielkie i wyczerpujące opisy w tym artykule się nie bawiłem bo wszystko omówimy sobie w trakcie warsztatu.
Pojedyncze wątki na start
object SingleThreadExample { def main(args: Array[String]) { val r=new Thread(new MyRunnable("Interfejs")) val t=new MyThread("Klasa") r.start() t.start() r.join() //czekamy na zakończenie obydwu watków t.join() } class MyThread(m:String) extends Thread { override def run=println(s"Jestem w watku : ${m}") } class MyRunnable(m:String) extends Runnable { def run = println(s"Jestem w runnable : ${m}") } }Nauka :
- Runnable to interfejs ale w Scali nie ma słowa kluczowego implements.
- Kiedy damy s przed stringiem to można wygodnie wstawiać weń zmienne
- Na końcu dajemy join aby główny wątek main nie skończył się przed wątkami badanymi
Kilka wątków dla współpracy
W tym przykładzie tworzymy kilka wątków, które współpracują ze sobą sumując wybrane elementy z tablicy i następnie wyliczają ostateczną sumę. Takie MapReduce dla ubogich. Z przykładu można się także nauczyć ciekawego sposobu inicjalizacji kolekcji poprzez "tabulate" a także trochę deklaratywnych transformacji tychże kolekcji.
A i w ScalaIDE działa już generowanie metod w traitach przy pomocy ctrl+1.
object CoupleThreeadsCooperation { def main(args: Array[String]) { val n=100 val v=Vector.tabulate(n)(i=>i*2) //1 val runnables=(0 to n/10).map(i=>new Summarizer(i*10,v)) //2 val threads=runnables.map(new Thread(_)) //2 threads.foreach(_.start()) //3 threads.foreach(_.join) //3 val sum=runnables.map(_.result).sum //4 println(sum) } class Summarizer(startIndex:Int,v:Vector[Int]) extends Runnable { private var _result:Int=0 // 5 def run(): Unit = { val dataForThread=v.slice(startIndex, startIndex+10) println(s"summarizing elements ${startIndex} : ${startIndex+10} - ${Thread.currentThread().getName}") //6 _result=dataForThread.sum } def result=_result //5 } }
- Używamy wektora bo tam wydajniej dostać się do danego elementu z indexem "i" także "slice" też pewnie działa szybciej
- Zamiast dziwnymi pętlami przetwarzamy dane lamdami.
- Podobnie można zażądać wywołania konkretnej metody an wszystkich elementach
- Przy tej linii w ramach odpowiedniego czasu i chęci słuchaczy można wspomnieć jak to jest zrobione, że dla kolekcji intów mamy metodę sum
- Jest stan i jest mutable :(
- Na potrzeby ćwiczenia walimy tekst zwykłym println
Kilka wątków - rywalizacja
Tym razem zobaczymy dlaczego programowanie wielowątkowe może nie działać. Uruchamiamy 100 wątków, każdy zwiększa stan licznika o 1- powinno być 100 - powinno a nie jest.
object CoupleThreadsWithGlobalState { private var global=0 def main(args: Array[String]) { val workers=Seq.fill(100)(new Worker()) //1 workers.foreach(_.start) workers.foreach(_.join) println(s"global na końcu ${global}") } class Worker extends Thread{ override def run=global=global+1 } }
- "Seq" - tutaj tak przy okazji skąd się wzięło Seq - zerknijcie tutaj - Scala - Hierarchia kolekcji
Locki
object CoupleThreadsWithGlobalStateGuardedByLocks { private var global=0 private val monitor=new Object() //1 pokazać przykład ze złym monitorem def main(args: Array[String]) { val workers=Seq.fill(100)(new Worker()) workers.foreach(_.start) workers.foreach(_.join) println(s"global na końcu ${global}") } class Worker extends Thread{ override def run=monitor.synchronized{ //1 global=global+1 } } }
- W scali nie ma słowa kluczowego synchronized. Jest za to metoda na AnyRef, która robi to samo
Atomic Integer
AtomicInteger wykorzystuje specjalne instrukcje procesora, który w jednym cyklu może wykonać dwie operacje w sposób atomowy. Procesory robi się z krzemu, a krzem uzyskuje chyba z piasku. W Afryce i Azji działają gangi, które nielegalnie przemycają piasek - ale wracając do przykładu zastosowanie AtomicInteger bardzo uprościło kod
object CoupleThreadsWithGlobalStateGuardedByLocks { private val global=new AtomicInteger(0) def main(args: Array[String]) { val workers=Seq.fill(100)(new Worker()) workers.foreach(_.start) workers.foreach(_.join) println(s"global na końcu ${global}") } class Worker extends Thread{ override def run=global.incrementAndGet() } }
Producent konsument - kolejka
Tutaj trochę dłuższy przykład. Tym razem do trzech współpracujących wątków dodajemy kolejkę, na której blokują się wątki producenta i konsumenta w zależności czy kolejka jest pusta czy pełna.
object ProducerConsumerQueue { def main(args: Array[String]) { val q: BlockingQueue[Int] = new LinkedBlockingQueue(3) //1 val p1 = new Thread(new Producer(q)) val p2 = new Thread(new Producer(q)) val c = new Thread(new Consumer(q)) p1.start() c.start() p2.start() } class Producer(q: BlockingQueue[Int]) extends Runnable { private val r = new Random() @tailrec //3 final def run(): Unit = { val v = r.nextInt(10) q.put(v) //2 println(s"producing ${v} : queue size ${q.size}") TimeUnit.SECONDS.sleep(3) run() //3 } } class Consumer(q: BlockingQueue[Int]) extends Runnable { def run(): Unit = { while (true) { val v = q.take() //2 println(s"consumed ${v} : queue size ${q.size}") TimeUnit.SECONDS.sleep(2) } } } }
- Kolejkę ustawiamy na 3 elementy aby się "zapchała" w trakcie pracy
- put i take blokują wątek w oczekiwaniu na rezultat
- A to taki eksperyment - teoretycznie zastąpiliśmy popularną pętlę "while(true)" rekurencją - chyba działa
Executory
W poniższym ćwiczeniu nie będziemy już tworzyć wątków z palca lecz użyjemy dodanych do Javy5 "Executorów" (lub po polsku Egzekutorów lub poprawnie po angielsku Executors)
object ExecutorsExample { def main(args: Array[String]) { val e=Executors.newFixedThreadPool(3) //1 (1 to 100).foreach{i=> println(s"run task ${i}") //2 e.submit(new Task()) } e.shutdown() } class Task extends Runnable { def run(): Unit = { println(s" running thread ${Thread.currentThread().getName}") //2 TimeUnit.SECONDS.sleep(2) } } }
- Inicjalizujemy pulę złożoną z trzech wątków
- I tutaj w logach zobaczymy, iż pomimo tego, że chcemy wykonania się 100 wątków to tylko trzy jadą w jednej chwili.
run task 93 run task 94 run task 95 run task 96 run task 97 run task 98 run task 99 run task 100 running thread pool-1-thread-1 running thread pool-1-thread-3 running thread pool-1-thread-2 running thread pool-1-thread-1 running thread pool-1-thread-3 running thread pool-1-thread-2
Callable
Tutaj zrobimy jeszcze raz to ćwiczenie, w którym watki liczą sumę elementów kolekcji. Drogi czytelniku zauważ, że tym razem nie ma potrzeby przechowywania żadnego stanu w wątkach ani nic takiego gdyż używamy Callable , które zwyczajnie zwraca rezultat.
object CalablesExample { def main(args: Array[String]) { val n=100 val v=Vector.tabulate(n)(i=>i*2) val e=Executors.newFixedThreadPool(5) val callables=(0 to n/10).map(i=>new Summarizer(i*10,v)) val futures=callables.map(e.submit(_)) val result=futures.map(_.get).sum println(result) e.shutdown() //1 e.awaitTermination(10, TimeUnit.SECONDS) } class Summarizer(startIndex:Int, v:Vector[Int]) extends Callable[Int] { def call(): Int = { val dataForThread=v.slice(startIndex, startIndex+10) println(s"summarizing elements ${startIndex} : ${startIndex+10} - ${Thread.currentThread().getName}") //6 dataForThread.sum } } }
- Pamiętajcie wyłączać po sobie światło.
Fork Join
I na koniec bonus z Javy7 - ForkJoin Framework - charakteryzuje się tym, że wątki sobie zabierają pracę jak nic nei robią.
object ForkJoinExample { def main(args: Array[String]) { val n = 100 val v = Vector.tabulate(n)(i => i * 2) //1 val t = new Task(v, 0, v.length) val p = new ForkJoinPool() //3 p.execute(t) p.shutdown() p.awaitTermination(10, TimeUnit.SECONDS) println(s"result ${t.get}") } import ForkJoinTask._ //4 class Task(is: Seq[Int], start: Int, end: Int) extends RecursiveTask[Int] { //5 def compute(): Int = { println(s"calculating ${start} - ${end}") if (end - start < 10) is.slice(start, end + 1).sum else { val mid = (start + end) / 2 val t1 = new Task(is, start, mid) val t2 = new Task(is, mid + 1, end) invokeAll(t1, t2) //6 t1.get() + t2.get() //7 } } } }
- Tutaj jakieś dane w kolekcji podobnie jak w poprzednich przykładach
- Tu był krzyż a teraz już go nie ma
- Nowy typ puli - FooooorkJoin
- To jest ciekawe , będziemy wykorzystywać statyczne metody z ForkJoinTask i w scali trzeba je niezależnie zaincludować (przynajmniej w ScalaIDE by działało.)
- Nowy typ klas do rozdziedziczenia - RecursiveTask[
- Statyczna klasa, która wywołuje pod-taski. Jak widać to my musimy zadbać o to by stworzyć ich odpowiednią ilość (i jakość).
- Na koniec suma i koniec
Podsumowanie
Jak już wspominałem w jakieś wielkie i wyczerpujące opisy się nie bawiłem bo wszystko omówimy sobie w trakcie warsztatu. Jeśli zajęcia się spodobają to na pewno będziemy je powtarzać i materiał uzupełniać.
To by było tyle jeśli chodzi o cześć pierwszą a w kolejnych : ThreadLocal,Semafory,Phasery,Lacze, Bariery, ThreadFactory i inne takie. A później(Albo w międzyczasie) jakieś najnowsze rozwiązania jak Akka i coś z JAvy8.
Oczywiście sam biorę udział w procesie edukacji, także jeśli ktoś znajdzie błędy w przykładach niech śmiało daje znać.
Brak komentarzy:
Prześlij komentarz