czwartek, 4 grudnia 2014

Ciekawsze modele wielowątkowe (a Akka) + sprawy społecznościowe

Kącik społecznościowy

Jutro zapraszamy na prezentację Marcin Grzejszcza­ka Mikroserwisy

JUG Łódź Misja

Cezary Draus - zarząd JUG niezależna improwizacja o misji JUG Łódź.

moim zdaniem warto by propagować przede wszystkim samorozwój
jako sposób na życie
no ludzie tego nie kumaja chyba
pewnie mysla ze sa IT arystokracja
bo tak im się wmawia
a do tego idzie niz demograficzny
wiec raczej ludzi zdolnych wiecej nie bedzie
a samo sie nie nakurwi
Golf R sam się nie kupi
musisz kurwa sie dorobic
albo krasc
albo sie kurwic
albo sie nauczyc
wybór należy do Ciebie

a samo się nie nakurwi - to od tej chwili nieoficjalna hasło szczęścia poprzez samorozwój.

Właściwy artykuł

Czy jest sens nauki czegoś co wydaje się w danej chwili niepotrzebne(to pytanie retoryczne także nie należy się nad tym zastanawiać tylko czytać dalej)? Przez wiele lat web development wyglądał dla mnie bardzo jednowątkowo. Zazwyczaj jakiś serwer ogarniał requesty przy pomocy swojej puli wątków a ja jako programista musiałem tylko uważać na globalny stan podczas gdy cała reszta szła od góry do dołu po linii prostej (zwaliduj dane, wywołaj jakieś funkcje i zapisz do bazy danych)

Teraz procki mają coraz więcej rdzeni a sam PlayFramework na dzień dobry 4 różne pule wątków i trzeba ten temat zacząć ogarniać. Istnieje kilka ciekawych pozycji o wątkach w Javie i np. taka "Java Concurrency in Practice" nie tyle opisuje jak dobrze zabrać się do tematu ile przedstawia na jak wiele złych sposobów można to zrobić.

Dlatego też warto się zainteresować pewnymi rozwiązaniami wysokopoziomowymi, które pojawiają się tu i ówdzie a w których nie trzeba aż tak bardzo wchodzić w interakcje pomiędzy poszczególnymi wątkami.

A jeśli to nie wystarczy to...

... Historia Hrabiego Monte Christo opowiada historię kolesia, który w swojej naiwności daje się złapać w intrygę swoich fałszywych przyjaciół i ląduje w więzieniu na wyspie gdzie przypadkiem poznaje innego kolesia, który ma wiedzę o wszystkim i tę wiedzę(+ gdzie jest skarb) przekazuje głównemu bohaterowi. Cały proces trwa wiele lat ale nasz ziom w końcu ogarnia swoją ignorancję i zdobywa nowe, pełniejsze spojrzenie na świat.

W przypadku tej historii potrzebny był szok by wyzwolić naukę - a mamy przecież o wiele przyjemniejsze opcje - jak np. czytanie ciekawego internetu zamiast oglądania telewizji i takich tam.

Jak związek ma ta historia z nowoczesnymi modelami programowania wielowątkowego? Żaden ale przynajmniej stanowi ciekawy przerywnik, daje uzasadnienie dla wklejenia fajnego obrazka i być może zabija wrażenie, że poniższa treść jest zwyczajnie spisana z internetu. Ale wracając do tematu...

Klasyka

Jak zwykle za pomoc edukacyjną posłuży naciągany problem, który można łatwo rozwiązać za pomocą AtomicInteger - ale dla potrzeb ćwiczenia załóżmy, że nie można

  • Globalny stan - licznik
  • Kilka wątków, które wchodzą sobie w drogę
object LockingExerciseSimple {

  private var counter = 0

  def main(args: Array[String]) {
    val threads = (1 to 100).map(_ => createThread())
    threads.foreach(_.start())
    threads.foreach(_.join())
    println(counter)
  }

  def createThread() = new Thread {
    override def run() = {
      counter = counter + 1
    }
  }
}

Uruchamiając ten kod nigdy nie powinniśmy otrzymać wyniku 100 gdyż ile tam wątków najedzie na siebie i nadpisze licznik.

By rozwiąπać problem klasycznie tworzymy "loka" wokół inkrementacji licznika i zaczynamy się zastanawiać czy o czymś przypadkiem nie zapomnieliśmy co może zakończyć się zakleszczeniem.

  • Globalny stan - licznik
  • Kilka wątków, które wchodzą sobie w drogę
  • Blokowanie wątków na locku
object LockingSolutionSimple {
 private var counter = 0
 private val lock=new ReentrantLock()

  def main(args: Array[String]) {
    val threads = (1 to 100).map(_ => createThread())
    threads.foreach(_.start())
    threads.foreach(_.join())
    println(counter)
  }

  // TO JEST WAZNY FRAGMENT
  def createThread() = new Thread {
    override def run() = {
      lock.lock()
      try{
       counter = counter + 1
      }finally{
        lock.unlock()
      }
    }
  }
}

Agent

  • Stan nie jest dostępny globalnie
  • Dostęp synchroniczny
  • Modyfikacje asynchroniczne

Generalnie agent to specyficzny rodzaj aktora - czyli warto w pierwszej kolejności wspomnieć o aktorach. Ale aktorzy są na topie i każdy o nich wspomina i takie powielanie opisów byłoby nudne.

Gdzieś na githubie widziałem wątek, w którym jacyś ludzie implementujący bugfixy w akka dyskutowali czy jak w tym a tym miejscu będzie volatile to dobry asembler javy powstanie czy coś w ten deseń. To co chcę tutaj powiedzieć,to to że jakieś mądre głowy zadbały już o to, że logika w aktorze wykona się jednowątkowo i nie będzie deadlocków.

Agent to właśnie taki specjalny aktor, który z założenia kontroluje dostęp do konkretnej zmiennej. Przystosowanie polega na tym, że ów stan zmiennej można pobrać synchronicznie w dowolnym momencie.

object AgentsSolution {
  import akka.agent.Agent
  import scala.concurrent.ExecutionContext.Implicits.global

  private var counter = Agent(0)

  def main(args: Array[String]) {
    //każdy wątek zwiększa stan licznika o jeden
    val threads = (1 to 100).map(_ => createThread())
    threads.foreach(_.start())
    threads.foreach(_.join())
    
    /** 1 updejty są wysyłane do agenta asynchronicznie dlatego na początku get wypisze zero**/
    println(counter.get)
    Thread.sleep(1000)
    // ale już po sekundzie powinna być setka
    println(counter.get)
    
    /**2 
    // w dowolnej chwili można pobrać z agenta future, który zakończy się gdy wszystkie funkcje z danej chwili zostaną wykonane
    val future=counter.future
    future.onComplete {
      case result => println(result)
    }
    Thread.sleep(1000)**/
    

  }
  // TO JEST WAZNY FRAGMENT
  def createThread() = new Thread {
    override def run() = {
      // a tak się wysyła do agenta funkcje, które on wykona jednowątkowo
      counter.send(c => c + 1)
    }
  }
}

STM

To w zasadzie zwykłe transakcje w pamięci

  • Globalny stan opakowany w Ref
  • Zmiany odbywają się w transakcji
  • w przypadku konfliktu mamy rollback
object STMSimpleSolution {
  //zwykle transakcje ale tylko dla danych, które są w Ref
  private var counter = Ref(0)

  def main(args: Array[String]) {
    val threads = (1 to 100).map(_ => createThread())
    threads.foreach(_.start())
    threads.foreach(_.join())
    
    atomic{implicit txn=>
     println(counter.get)
    }
  }

  // TO JEST WAZNY FRAGMENT
  def createThread() = new Thread {
    override def run() = {
      // jeśli dwa wątki wjadą na raz z tą samą wartością to zmiany będą rollbackowane
      // i jedna z transakcji powtórzona
      atomic{implicit txn=>
          val currentValue=counter.get
       counter.set(currentValue+1)
      }
    }
  }
}

Dataflow

  • Nie ma faktów są jedynie obietnice
  • Do zmiennych dane można przypisać tylko raz
  • Każdy flow to inny wątek ale wszystko magicznie się układa

To jest fajne - przeterminowane -ale fajne bo zagina czasoprzestrzeń. Bo zobaczmy chociaż taką sytuację :

    // CAŁY FRAGMENT W ZASADZIE JEST WAŻNY
    val counter = Promise[Int]()
    val v1, v2 = Promise[Int]()

    //tutaj jest dodawanie
    flow {
      threadName()
      counter << v1() + v2()
      println("counter = " + counter())
    } 
    // a dopiero tutaj ustalenie co dodawać
    flow {
      threadName()
      v1 << 40 
    }
    flow {
      threadName()
      v2 << 2 
    }
    // i każdy flow odbywa się w innym wątku
    //ForkJoinPool-1-worker-11
    //ForkJoinPool-1-worker-13
    //ForkJoinPool-1-worker-9

To jest fajne ale już deprecated bo w najnowszej wersji Akka zastąpiła je scala async

Async Aysnc

  • Instrukcje do asynchronicznego odpalania obliczeń w osobnym wątku są tworzone na prawdę bardzo prosto
  • Można określać zależność czasową pomiędzy blokami przy pomocy prostego await
    var counter = 0

    val future1 = async {
      threadName()
      Thread.sleep(300)
      40
    }
    val future2 = async {
      threadName()
      Thread.sleep(200)
      2
    }

    val sumFuture = async {
      threadName()
      counter = await(future1) + await(future2)
      counter
    }

    sumFuture.onSuccess {
      case result => println(result)
    }
//ForkJoinPool-1-worker-13
//ForkJoinPool-1-worker-11
//ForkJoinPool-1-worker-9
//42

Na koniec

Z ciekawych modeli wielowątkowych, o których słyszałem są jeszcze "channels", które na pewno występują w języku Go i bibliotece clojure core.async ale nic o tym nie wiem to też nic nie napiszę ale zakończę artykuł w tym miejscu.

Brak komentarzy:

Prześlij komentarz