poniedziałek, 26 stycznia 2015

Warsztaty o programowaniu wielowątkowym z Javy ale w Scali (chyba część pierwsza)

Poniżej materiał do warsztatów uczących używania mechanizmów wielowątkowych dostępnych w Javie aleee programować będziemy w Scali. Kiedy się odbędzie warsztat? Warsztat odbędzie się w momencie jego dobycia. Pewnie w przyszłym miesiącu.

Celem warsztaty będzie zapoznanie uczestników (którzy w założeniu nie stykają się z problemami wielowątkowości na co dzień) z ewolucją którą przeszła Java do wersji 1.7 - (java.util.concurrent, fork-join i takie tam.) Jednocześnie nie zarzucając słuchaczy zbyt wielką listą szczegółów.

Aby upiec dwie pieczenie na jednym ruszcie - będziemy przy okazji uczyć się Scali. To trochę jak nauka angielskiego poprzez tłumaczenie najnowszego wydania "Świata Dorsza" na tenże język.

Na warsztat przygotuję jakieś próbki kodu tak aby uczestnicy nie musieli pisać wszystkiego od początku. Kod będzie kompletny np. w 70% i nauka odbywać będzie się poprzez uzupełnienie brakujących fragmentów z towarzyszącym niezwykle krytycznym procesem wyciągania wniosków.

Po co uczyć się "starych" mechanizmów wielowątkowości jeśli ostatnio różne nowe rozwiązania działają na wyższym poziomie abstrakcji? No po to aby wiedzieć dlaczego one działają na wyższym poziomie abstrakcji i dlaczego w ogóle działają.

W jakieś wielkie i wyczerpujące opisy w tym artykule się nie bawiłem bo wszystko omówimy sobie w trakcie warsztatu.

Pojedyncze wątki na start

object SingleThreadExample {

  def main(args: Array[String]) {
    val r=new Thread(new MyRunnable("Interfejs"))
    val t=new MyThread("Klasa")
    
    r.start()
    t.start()
    
    r.join() //czekamy na zakończenie obydwu watków
    t.join()
  }
  
  class MyThread(m:String) extends Thread {
      override def run=println(s"Jestem w watku : ${m}")
  }  
  
  class MyRunnable(m:String) extends Runnable {
    def run = println(s"Jestem w runnable : ${m}")
  }
}
Nauka :
  • Runnable to interfejs ale w Scali nie ma słowa kluczowego implements.
  • Kiedy damy s przed stringiem to można wygodnie wstawiać weń zmienne
  • Na końcu dajemy join aby główny wątek main nie skończył się przed wątkami badanymi

Kilka wątków dla współpracy

W tym przykładzie tworzymy kilka wątków, które współpracują ze sobą sumując wybrane elementy z tablicy i następnie wyliczają ostateczną sumę. Takie MapReduce dla ubogich. Z przykładu można się także nauczyć ciekawego sposobu inicjalizacji kolekcji poprzez "tabulate" a także trochę deklaratywnych transformacji tychże kolekcji.

A i w ScalaIDE działa już generowanie metod w traitach przy pomocy ctrl+1.

object CoupleThreeadsCooperation {

  def main(args: Array[String]) {
    val n=100
    val v=Vector.tabulate(n)(i=>i*2) //1
    
    val runnables=(0 to n/10).map(i=>new Summarizer(i*10,v)) //2
    val threads=runnables.map(new Thread(_)) //2
    threads.foreach(_.start()) //3
    threads.foreach(_.join)  //3
    
    val sum=runnables.map(_.result).sum //4
    println(sum)
  }
  
  class Summarizer(startIndex:Int,v:Vector[Int]) extends Runnable {
    
    private var _result:Int=0 // 5
    
    def run(): Unit = {
      val dataForThread=v.slice(startIndex, startIndex+10)
      println(s"summarizing elements ${startIndex} : ${startIndex+10} - ${Thread.currentThread().getName}") //6
      _result=dataForThread.sum
    }
    
    def result=_result //5
  }
}
  1. Używamy wektora bo tam wydajniej dostać się do danego elementu z indexem "i" także "slice" też pewnie działa szybciej
  2. Zamiast dziwnymi pętlami przetwarzamy dane lamdami.
  3. Podobnie można zażądać wywołania konkretnej metody an wszystkich elementach
  4. Przy tej linii w ramach odpowiedniego czasu i chęci słuchaczy można wspomnieć jak to jest zrobione, że dla kolekcji intów mamy metodę sum
  5. Jest stan i jest mutable :(
  6. Na potrzeby ćwiczenia walimy tekst zwykłym println

Kilka wątków - rywalizacja

Tym razem zobaczymy dlaczego programowanie wielowątkowe może nie działać. Uruchamiamy 100 wątków, każdy zwiększa stan licznika o 1- powinno być 100 - powinno a nie jest.

object CoupleThreadsWithGlobalState {

  private var global=0
  
  def main(args: Array[String]) {
    val workers=Seq.fill(100)(new Worker()) //1
    workers.foreach(_.start)
    workers.foreach(_.join)
    
    println(s"global na końcu ${global}")
  }
  
  class Worker extends Thread{
    override def run=global=global+1
  }
}
  1. "Seq" - tutaj tak przy okazji skąd się wzięło Seq - zerknijcie tutaj - Scala - Hierarchia kolekcji

Locki

object CoupleThreadsWithGlobalStateGuardedByLocks {

  private var global=0
  private val monitor=new Object() //1 pokazać przykład ze złym monitorem
  
  def main(args: Array[String]) {
    val workers=Seq.fill(100)(new Worker())
    workers.foreach(_.start)
    workers.foreach(_.join)
    
    println(s"global na końcu ${global}")
  }
  
  class Worker extends Thread{
    override def run=monitor.synchronized{  //1
    	  global=global+1 
      }
  }
}
  1. W scali nie ma słowa kluczowego synchronized. Jest za to metoda na AnyRef, która robi to samo

Atomic Integer

AtomicInteger wykorzystuje specjalne instrukcje procesora, który w jednym cyklu może wykonać dwie operacje w sposób atomowy. Procesory robi się z krzemu, a krzem uzyskuje chyba z piasku. W Afryce i Azji działają gangi, które nielegalnie przemycają piasek - ale wracając do przykładu zastosowanie AtomicInteger bardzo uprościło kod

object CoupleThreadsWithGlobalStateGuardedByLocks {

  private val global=new AtomicInteger(0)
  
  def main(args: Array[String]) {
    val workers=Seq.fill(100)(new Worker())
    workers.foreach(_.start)
    workers.foreach(_.join)
    
    println(s"global na końcu ${global}")
  }
  
  class Worker extends Thread{
    override def run=global.incrementAndGet()
  }
}

Producent konsument - kolejka

Tutaj trochę dłuższy przykład. Tym razem do trzech współpracujących wątków dodajemy kolejkę, na której blokują się wątki producenta i konsumenta w zależności czy kolejka jest pusta czy pełna.

object ProducerConsumerQueue {

  def main(args: Array[String]) {
    val q: BlockingQueue[Int] = new LinkedBlockingQueue(3) //1
    val p1 = new Thread(new Producer(q))
    val p2 = new Thread(new Producer(q))
    val c = new Thread(new Consumer(q))

    p1.start()
    c.start()
    p2.start()
  }

  class Producer(q: BlockingQueue[Int]) extends Runnable {

    private val r = new Random()

    @tailrec  //3
    final def run(): Unit = {
      val v = r.nextInt(10)
      q.put(v)       //2
      println(s"producing ${v} :  queue size ${q.size}")
      TimeUnit.SECONDS.sleep(3)
      run() //3
    }
  }

  class Consumer(q: BlockingQueue[Int]) extends Runnable {
    def run(): Unit = {
      while (true) {
        val v = q.take() //2
        println(s"consumed ${v} :  queue size ${q.size}")
        TimeUnit.SECONDS.sleep(2)
      }
    }
  }
}
  1. Kolejkę ustawiamy na 3 elementy aby się "zapchała" w trakcie pracy
  2. put i take blokują wątek w oczekiwaniu na rezultat
  3. A to taki eksperyment - teoretycznie zastąpiliśmy popularną pętlę "while(true)" rekurencją - chyba działa

Executory

W poniższym ćwiczeniu nie będziemy już tworzyć wątków z palca lecz użyjemy dodanych do Javy5 "Executorów" (lub po polsku Egzekutorów lub poprawnie po angielsku Executors)

object ExecutorsExample {

  def main(args: Array[String]) {
    val e=Executors.newFixedThreadPool(3) //1
    
    (1 to 100).foreach{i=>
      println(s"run task ${i}") //2
      e.submit(new Task())
    }
    e.shutdown()
  }
  
  class Task extends Runnable {
    def run(): Unit = {
      println(s" running thread ${Thread.currentThread().getName}") //2
      TimeUnit.SECONDS.sleep(2)
    }
  }
}
  1. Inicjalizujemy pulę złożoną z trzech wątków
  2. I tutaj w logach zobaczymy, iż pomimo tego, że chcemy wykonania się 100 wątków to tylko trzy jadą w jednej chwili.
run task 93
run task 94
run task 95
run task 96
run task 97
run task 98
run task 99
run task 100
 running thread pool-1-thread-1
 running thread pool-1-thread-3
 running thread pool-1-thread-2
 running thread pool-1-thread-1
 running thread pool-1-thread-3
 running thread pool-1-thread-2

Callable

Tutaj zrobimy jeszcze raz to ćwiczenie, w którym watki liczą sumę elementów kolekcji. Drogi czytelniku zauważ, że tym razem nie ma potrzeby przechowywania żadnego stanu w wątkach ani nic takiego gdyż używamy Callable , które zwyczajnie zwraca rezultat.

object CalablesExample {

  def main(args: Array[String]) {
    val n=100
    val v=Vector.tabulate(n)(i=>i*2)
    val e=Executors.newFixedThreadPool(5)
    
    val callables=(0 to n/10).map(i=>new Summarizer(i*10,v))
    val futures=callables.map(e.submit(_))
    val result=futures.map(_.get).sum
    
    println(result)
    e.shutdown() //1
    e.awaitTermination(10, TimeUnit.SECONDS)
  }
  
  class Summarizer(startIndex:Int, v:Vector[Int]) extends Callable[Int] {
    
    def call(): Int = {
      val dataForThread=v.slice(startIndex, startIndex+10)
      println(s"summarizing elements ${startIndex} : ${startIndex+10} - ${Thread.currentThread().getName}") //6
      dataForThread.sum
    }
    
  }
}
  1. Pamiętajcie wyłączać po sobie światło.

Fork Join

I na koniec bonus z Javy7 - ForkJoin Framework - charakteryzuje się tym, że wątki sobie zabierają pracę jak nic nei robią.

object ForkJoinExample {

  def main(args: Array[String]) {
    val n = 100
    val v = Vector.tabulate(n)(i => i * 2) //1

    val t = new Task(v, 0, v.length)
    val p = new ForkJoinPool() //3
    p.execute(t) 

    p.shutdown()
    p.awaitTermination(10, TimeUnit.SECONDS)
    println(s"result ${t.get}")
  }

  import ForkJoinTask._ //4

  class Task(is: Seq[Int], start: Int, end: Int) extends RecursiveTask[Int] { //5
    def compute(): Int = {
      println(s"calculating ${start} - ${end}")
      if (end - start < 10)
        is.slice(start, end + 1).sum
      else {
        val mid = (start + end) / 2
        val t1 = new Task(is, start, mid)
        val t2 = new Task(is, mid + 1, end)
        invokeAll(t1, t2) //6
        t1.get() + t2.get() //7
      }
    }
  }
}
  1. Tutaj jakieś dane w kolekcji podobnie jak w poprzednich przykładach
  2. Tu był krzyż a teraz już go nie ma
  3. Nowy typ puli - FooooorkJoin
  4. To jest ciekawe , będziemy wykorzystywać statyczne metody z ForkJoinTask i w scali trzeba je niezależnie zaincludować (przynajmniej w ScalaIDE by działało.)
  5. Nowy typ klas do rozdziedziczenia - RecursiveTask[
  6. Statyczna klasa, która wywołuje pod-taski. Jak widać to my musimy zadbać o to by stworzyć ich odpowiednią ilość (i jakość).
  7. Na koniec suma i koniec

Podsumowanie

Jak już wspominałem w jakieś wielkie i wyczerpujące opisy się nie bawiłem bo wszystko omówimy sobie w trakcie warsztatu. Jeśli zajęcia się spodobają to na pewno będziemy je powtarzać i materiał uzupełniać.

To by było tyle jeśli chodzi o cześć pierwszą a w kolejnych : ThreadLocal,Semafory,Phasery,Lacze, Bariery, ThreadFactory i inne takie. A później(Albo w międzyczasie) jakieś najnowsze rozwiązania jak Akka i coś z JAvy8.

Oczywiście sam biorę udział w procesie edukacji, także jeśli ktoś znajdzie błędy w przykładach niech śmiało daje znać.

Brak komentarzy:

Prześlij komentarz