Vés al contingut

Haskell concurrent

De la Viquipèdia, l'enciclopèdia lliure
Infotaula de llenguatge de programacióHaskell concurrent
Tipusllenguatge de programació Modifica el valor a Wikidata

Haskell concurrent amplia Haskell98 amb concurrència explícita.

Els dos conceptes principals en què es basa Haskell concurrent són les Mutable variables MVar α i la possibilitat d'engegar un nou fil d'execució via forkIO.

Concurrència

[modifica]
forkIO
engega un fil d'execució lleuger del planificador del Run Time System que, segons el compilador de Haskell concurrent, pot implementar multitasca cooperativa o bé arrabassadora (ang:preemptive)).[1]

Vegeu "La implementació de concurrència de GHC".[2]

mecanismes de comunicació
tipus mònada generadors mòdul descripció
MVar a[3] IO newMVar x
-- var buida per a un tipus T
newEmptyMVar :: IO (MVar T)
Control.Concurrent.MVar vars. globals sincronitzades que es buiden en consultar-les
doblen la funcionalitat com a
* bústia d'un sol element per baldes (ang:locks)
* semàfor binari
Chan a[4] IO newChan x
Control.Concurrent.Chan bústia amb cua il·limitada, per baldes (ang:locks)
TVar a[5] STM / IO newTVar x -- encapsulable
newTVarIO x -- global
Control.Concurrent.STM.TVar posicions de memòria compartida
suporten transaccions de memòria atòmiques
TMVar a[6] STM / IO newTMVar x -- encapsulable
newTMVarIO x -- global
Control.Concurrent.STM.TMVar MVar (bústia de comunicació d'un sol elem.) protegida transaccionalment
TChan a[7] STM / IO newTChan x -- encapsulable
newTChanIO x -- global
Control.Concurrent.STM.TChan bústia de comunicació transaccional amb cua il·limitada

accés sincronitzat amb bústies d'un sol element anomenades MVar

[modifica]

En altres llenguatges, per assegurar la correcta actualització d'una variable, cal posar un monitor que impedeix l'accés a altres processos durant la lectura i actualització a la variable sincronitzada.

Aquí la lectura d'una MVar (takeMVar) transfereix l'objecte contingut al fil consumidor, deixant-la buida, provocant que els altres processos que la vulguin llegir hagin d'esperar que el fil productor reposi el contingut actualitzat (amb putMVar), aprofitant així el mecanisme de concurrència per bústies de missatges.[8]

  • "takeMVar mvar" bloca el fil d'exec. si mvar estava buida, per tant no disponible
  • "putMVar mvar valor" bloca si mvar estava plena, per tant ocupada

Generadors:

do 
 -- mvar nova amb contingut especificat
 mvarPlenaInicialment <- newMVar contingut -- newMVar :: t -> IO (MVar t)

 -- mvar buida per a un contingut de tipus T. especificant el tipus
 mvarBuidaInicialment <- newEmptyMVar :: IO (MVar T)

Una MVar té tres facetes:[8]

  • Variable amb accés sincronitzat
  • Bústia de comunicació d'un sol element (takeMVar com a receive, putMVar com a send)
  • Semàfor binari (takeMVar com a wait, putMVar com a signal)
mvarSemàfor <- newEmptyMVar :: IO (MVar Bool)

-- engega fil d'exec. i en acabar 
-- desperta el primer dels fils suspesos pendents de la MVar
threadId <- forkIO procés `finally` putMVar mvarSemàfor True
...
-- suspèn, tot esperant que el fil d'exec. de ''procés'' acabi
takeMVar mvarSemàfor

Darrerament s'hi han afegit crides no blocants (tryTakeMVar, tryPutMVar).

També tenim modifyMVar_ (composició de takeMVar i putMVar que retorna IO ()) i altres novetats.

L'accés a una variable MVar és d'avaluació tardana, per tant el contingut serà avaluat en el consumidor i no en el productor!!.[8]

La versió d'avaluació estricta Control.Concurrent.MVar.Strict[9] del paquet strict-concurrency pretén evitar pèrdues d'espai forçant l'avaluació a Head Normal Form en dipositar un objecte en una MVar (putMVar).

Els fils blocats es desperten per ordre de suspensió (FIFO)."[8]putMVar mvar" desperta el primer dels fils que ha cridat takeMVar amb la mateixa mvar, que s'emporta el valor i la deixa buida.

TMVar de Control.Concurrent.STM.TMVar
MVar amb protecció transaccional (Software Transactional Memory)
  • takeMVar mvar, si la variable és buida, en comptes de blocar provoca retry transaccional
  • putMVar mvar valor, si la variable és plena, en comptes de blocar provoca retry transaccional

bústies de comunicació amb MVar, Chan, BoundedChan

[modifica]

Canals amb sincronització per baldes (ang: locks).

  • MVar: bústies d'un sol element (de Control.Concurrent)
  • Chan: bústies amb cua il·limitada (de Control.Concurrent.Chan)
  • BoundedChan: bústies limitades (del paquet BoundedChan)

Exemples:

forkOS: llançament de fils d'execució lligats al del sistema (Bound threads)

[modifica]

Creació de fils d'execució per a crides FFI a biblioteques externes o a biblioteques amb allotjament local al fil.

GHC assigna habitualment els fils d'exec. lleugers de la multitasca GHC llançats amb forkIO, en relació N-M amb els fils del sistema (hi ha un fil del sistema per cada processador elemental, anomenats capability sobre els quals s'executen els llançats amb forkIO).[10][2][11]

forkOS: Per poder fer ús de característiques específiques dels fils d'execució del sistema, forkOS facilita el llançament d'un fil d'execució en relació 1-1 amb els del sistema (Fils d'execució "lligats als del sistema" (Bound threads)),[12] facilitant l'ús de crides externes (FFI: Foreign Function Interface) en un fil, o bé l'ús d'allotjament lligat al fil d'execució. Biblioteques com OpenGL requereixen aquest ús.[12][13]

modalitats de llançament

[modifica]

Vegeu ref.[10]

  • llançament de fil lleuger (els del planificador RTS):[14]
forkIO :: IO () {- procés -} -> IO ThreadId
  • llançament de fil lligat al del sistema:[15]
forkOS :: IO () -> IO ThreadId
  • llançament de fil d'execució indicant numèricament el processador (capability) on cal executar-lo:[16]
forkOn :: Int -> IO () -> IO ThreadId
  • llançament de fil d'exec. amb funció per executar en acabar:[17]
forkFinally :: IO a -> (Either SomeException a -> IO ()) {- enHaventAcabat -} -> IO ThreadId

memòria transaccional: variables TVar, i comunicació amb TMVar i TChan

[modifica]
  • les TVar són variables de "Memòria transaccional per programari" de l'anglès "Software transactional memory" conegut per la sigla STM.
  • TMVar i TChan són els equivalents transaccionals de MVar i Chan

Vegeu exemple #Concurrència condicionada amb TVars - Mònada STM - Memòria transaccional

Futurs

[modifica]

Paral·lelització i espera d'accions o càlculs llançats asíncronament.

Encadenament de càlculs asíncrons - la mònada Par

[modifica]

Permet combinar càlculs funcionals purs en paral·lel, encadenant resultats com a paràmetres de paral·lelitzacions subseqüents. Vegeu refs.[18][19]

  • La classe ParFuture especialitza una mònada modelant el comportament d'un objecte Futur: un objecte que havent engegat una acció asíncronament, possibilita l'obtenció del resultat posant el fil d'execució original en espera, fins que l'acció asíncrona acabi.
  • La classe ParIVar especialitza la ParFuture modelant una Promesa: una especialització escribible d'un Futur que possibilita resoldre l'acció asíncrona assignant un resultat.
  • El tipus (IVar a) és el de les variables d'assignació única, accessibles des de la tasca asíncrona i l'original.
  • El tipus (Par (Ivar a)) instancia les classes ParFuture i ParIvar
{-# LANGUAGE PackageImports #-}

import "monad-par" Control.Monad.Par (Par, runPar, spawn, get, put) -- put permet donar valor a una Promesa (Futur assignable) abans no acabi

f = (*2)
g = (/2)
h (a, b) = a + b
k (a, b) = a - b

calcula_paral·lelitzant :: Double -> Par (Double, Double)
calcula_paral·lelitzant x = do
 futur_fx <- spawn (return (f x)) -- engega l'avaluació en paral·lel de (f x)
 futur_gx <- spawn (return (g x)) -- engega l'avaluació en paral·lel de (g x)
 a <- get futur_fx -- espera que acabi el càlcul `fx` i n'obté el resultat
 b <- get futur_gx -- espera que acabi el càlcul `gx` ...
 futur_hab <- spawn (return (h (a,b))) -- engega en paral·lel (h (a,b))
 futur_kab <- spawn (return (k (a,b))) -- ...
 c <- get futur_hab -- espera resultat de `hab`
 d <- get futur_kab -- espera resultat de `kab`
 return (c,d)

main = do
 let res = runPar $ calcula_paral·lelitzant 4
 print res
cabal install monad-par
ghc --make -threaded -rtsopts -with-rtsopts=-N prova.hs
./prova
(10.0,6.0)

Per a càlculs que poden llançar excepcions, la biblioteca Async ofereix més varietats de tractament.

Operacions d'Entrada/Sortida asíncrones i simultànies

[modifica]

Amb la biblioteca Async.[20][21] -- Futurs com a efecte global IO, amb implementació com a functor aplicatiu en el tipus Concurrently (implementa les classes Applicative i Alternative).

operacions asíncrones
[modifica]
  • withAsync llança una operació IO asíncronament (sense esperar-ne la finalització) en un fil d'execució nou i n'assegura la terminació (basat en bracket sobre el recurs fil d'execució (obtenció de recurs: async io) (alliberament de recurs: cancel)).
  • wait async atura el fil actual en espera que s'acabi l'op. asíncrona designada per async, oferint-ne el resultat, i propagant-ne l'excepció en cas que n'hi hagi. Si hi ha hagut excepció al fil asíncron serà rellançada al fil actual.
  • waitBoth espera el resultat de dues accions asíncrones.
  • waitEither espera el resultat de la primera que acabi; waitEitherCancel cancela la restant
  • waitAny [Async a] espera la primera que acabi d'una llista d'accions asíncrones; waitAnyCancel cancela les restants

Hi ha versions amb el sufix Catch com waitCatch per caçar l'excepció del fil asíncron, retornant el resultat o l'error en un tipus Either, en comptes de rellançar l'excepció.

import Control.Concurrent.Async (withAsync, wait)

obtenirURLs url1 url2 = do
 -- engega l'execució asíncrona d'una acció i n'associa el Futur a la var. async1
 -- cas d'excepció dins el bloc de 'withAsync', avorta el fil d'execució de l'acció asíncrona
 withAsync (getURL url1) $ \async1 -> do -- el bloc que segueix s'executa al fil d'execució original
 withAsync (getURL url2) $ \async2 -> do -- engega l'execució asíncrona i la lliga a async2
 page1 <- wait async1 -- espera que acabi async1 i en retorna el resultat
 page2 <- wait async2 -- espera async2 ...
 -- si es produeix una excepció dins l'acció asíncrona d'async1, és a dir (wait async1) peta, 
 -- llavors el fil d'execució de l'acció d'async2 és avortat pel tractament d'excepcions del segon 'withAsync'
 -- i així no queden fils d'execució en segon pla penjats.
 return (page1, page2)
combinadors d'operacions simultànies
[modifica]

A la mateixa biblioteca async:[22]

  • concurrently simultaneja dues operacions, i en retorna ambdós resultats.
concurrently :: IO a -> IO b -> IO (a, b)
  • race simultaneja dues operacions retornant el resultat de la primera que acabi i avortant la restant.
race :: IO a -> IO b -> IO (Either a b)
  • mapConcurrently mapeja una funció amb efectes col·laterals (a -> IO b) a una col·lecció de valors, i les executa simultàniament, retornant-ne la col·lecció de resultats.
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
L'efecte Concurrently
[modifica]

A més alt nivell el tipus Concurrently[23] embolcalla l'operativa precedent en la implementació de les classes Applicative (aplicant l'esmentat combinador concurrently) i Alternative (aplicant race, per esperar la primera que acabi, cancel·lant les altres).

import Control.Concurrent.Async (Concurrently (runConcurrently))

obtenirURLs url1 url2 url3 = do
 (page1, page2, page3) <- runConcurrently $ -- runConcurrently és l'accessor del "newtype" Concurrently
 -- combina aplicativament els resultats dels efectes Concurrently en una Tupla 
 pure (,,) <*> Concurrently (getURL url1) 
 <*> Concurrently (getURL url2)
 <*> Concurrently (getURL url3)

-- la implementació d'Alternative (<|>) sobre Concurrently retorna la que acaba primer, cancel·lant les altres
ghci
Prelude>:m + Control.Applicative Control.Concurrent Control.Concurrent.Async Control.Exception
Prelude ...> :{ let esperaSegons :: Int -> IO Int 
 esperaSegons secs = threadDelay (secs * 1000000) >> return secs
 :}
Prelude ...> let ca = Concurrently $ esperaSegons 5
Prelude ...> let cb = Concurrently $ esperaSegons 3
Prelude ...> runConcurrently $ ca <|> cb -- amb l'op. (<|>) d'Alternative, avorta la que no acabi primer
3 
Prelude ...> let cc = Concurrently $ throwIO $ userError "excepció" -- per provar el llançament d'excepcions

Prelude ...> runConcurrently $ ca <|> cb <|> cc -- cc llança una excepció al fil asíncron que serà rellançada al principal
*** Exception: user error (excepció)

model d'àlgebra de processos CSP

[modifica]

Communicating Haskell Processes

[modifica]

La biblioteca "Communicating Haskell Processes" de la universitat de Kent implementa una àlgebra de processos CSP, basada en mònades.[24][25][26]

Aquesta biblio. consta de diversos paquets. Cal el paquet chp-plus del Hackage per fer córrer els exemples de les guies.

Si p i q són processos, s'hi defineix, entre d'altres, els següents operadors de composició:

 stop -- :: CHP () -- CSP: STOP o bé 0
 skip -- :: CHP a -- CSP: SKIP
 p <-> q -- alternativa, CSP: P | Q
 p <||> q -- paral·lelisme, CSP: P || Q
 p >> q -- seqüència, CSP: P; Q
 forever p -- iteració, CSP: ∗P
 p </> q -- alternativa amb prioritat
 p <&> q -- sincronia (''join''), espera que ambdós processos hagin acabat.

El codi (per a GHC 6.10) ha quedat una mica desfasat. Per compilar a GHC 7.4 cal solucionar ambigüitats per la col·lisió de símbols afegits a les noves versions de les biblioteques emprades i un símbol desaparegut de les noves versions de QuickCheck que es troba a la versió 2.3.* Passos:

# -- Module 'Test.QuickCheck.Property' does not export `liftIOResult'
# -- Al compilar QuickCheck-2.3.0.2 -- couldn't deduce (Show a) from (Integral a) -- a la signatura de ''ranges''
# descarregar QuickCheck-2.3.0.2
# actualitzar la versió al fitxer .cabal afegint-hi .1 (Version: 2.3.0.2.1)
# modif. Test.QuickCheck.Text afegint (Show a) a la signatura de la funció ''ranges''
cabal install

# -- solucionar al paquet "chp" mòdul Control.Concurrent.CHP.Clocks, l'ambigüitat dels símbols modifyTVar i modifyTVar'
# -- modifyTVar i modifyTVar' corresponen a funcions definides localment al mòdul
# import Control.Concurrent.STM hiding (modifyTVar, modifyTVar') 
# actualitzar la versió al fitxer .cabal afegint-hi .1
cabal install

cabal install chp-plus --constraint=QuickCheck==2.3.0.2.1 --constraint=pretty==1.1.0.* --constraint=chp==num_versió_modificada

El model d'actors evita la problemàtica dels blocatges, mitjançant la concurrència per pas de missatges, aïllant les estructures compartides en processos indepentdents anomenats actors que en gestionen l'estat.

la biblio Actor

[modifica]

Implementa actors multi-canal despatxant, a la recepció, en comptes de per missatge, per llista de missatges de canals diferents.[27]

Està en codi Haskell98 i caldrà seguir les instruccions a Compilador Haskell de Glasgow#Compilar codi antic Haskell98 amb GHC.

Cloud Haskell

[modifica]

Concurrència per pas de missatges per a sistemes distribuïts, a l'estil de l'Erlang. Aplica el model d'Actors als processos distribuïts de manera similar a les construccions del llenguatge i sistema concurrent distribuït de nodes de l'Erlang.[28][29][30]

Excepcions asíncrones

[modifica]

Les excepcions asíncrones, llançades des d'altres fils al propi, amb throwTo o bé killThread,[31] tenen la seva problemàtica. La biblioteca safe-exceptions aporta una millora, discriminant el tractament de les síncrones i les asíncrones.[32]

Excepcions no tractades en els fils subordinats

[modifica]

Les excepcions en els fils que hem llançat, que salten sense ésser tractades, es poden recollir, en el fil d'exec. pare, mitjançant setUncaughtExceptionHandler.[33]

setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()

Vegeu exemple a "The Unhandled Exception Handler".[34]

Paral·lelisme

[modifica]

Primitives de paral·lelisme - Compilació per a processadors multicor

[modifica]

Vegeu[35]

par

par:: a → b → b —activa el càlcul del primer operand en paral·lel (que s'encua en espera d'una CPU disponible) mentre que el segon s'executa al fil d'exec. actual, retornant el resultat d'aquest darrer.[36]

pseq

pseq:: a → b → b —avalua el primer operand en el fil d'exec. actual, de manera primerenca (estricta) i avalua el segon de manera tardana (lazy) quin resultat retorna. Vegeu "seq vs. pseq"[37][38]

opció multiprocessador

L'opció -threaded de "ghc --make" relliga el programa amb la biblio. del Run Time System multiprocessador, emprant diversos fils d'execució del sistema per possibilitar el paral·lelisme, altrament el relliga amb la de l'RTS uniprocessador.[39]

GHC conté un planificador de fils d'execució lleugers, llançats amb forkIO, que s'executen amb relació "M a N" sobre fils d'execució del sistema (1 per cada processador elemental present) anomenats Capability.[10]

{-# LANGUAGE PackageImports #-}

 import "parallel" Control.Parallel (par, pseq)

 parfib :: Int  Integer
 parfib 0 = 0
 parfib 1 = 1
 parfib n | n > 1 = nf2 `par` (nf1 `pseq` (nf1+nf2)) -- calc nf2 en un fil en paral·lel i nf1 al fil principal
 -- i seqüencialment, al fil principal,
 -- en acabar en retorna la suma
 where nf1 = parfib (n-1)
 nf2 = parfib (n-2)

 main = print $ parfib 10
-- compilació amb -threaded per fer servir la biblio. "Run Time System multi-processador"—afegir -rtsopts per poder afegir paràmetres al llançador per a l'R.T.S.
ghc—make -threaded -rtsopts parfib.hs—execució mostrant estadístiques "+RTS -s". Afegirem -Nx per un nombre x de processadors elementals.
-- "si el user time (temps en mode usuari) és major que l'elapsed time (temps transcorregut)
-- és que s'ha emprat més d'un processador[40]
-- proveu-ho també sense -Nx
-- per distingir les opcions d'execució específiques per al Haskell Run Time System de les del programa,
-- cal escriure els params. per al Haskell després de +RTS
-- i tancar amb -RTS si volem afegir, tot seguit, params. per al programa.

./parfib +RTS -s -N2

Estratègies

[modifica]

Les estratègies són funcions de coordinació de l'execució.[41][42] Vegeu també[43][44]

Per assegurar la constància dels resultats de les operacions, executades amb par i pseq, possiblement en fils d'execució paral·lels, convé seqüenciar-ne les operacions per garantir-ne els resultats. La mònada Eval serveix per aquest fi.[41]

import Control.Category ((>>>)) --- f >>> g = g. f

data Eval a = Done { runEval :: a }

-- aquest tipus genera els operadors inversos (constructor i accessor) dels registres d'un sol component:
-- Done :: a → Eval a
-- runEval :: Eval a → a

-- instanciem la classe mònada sobre el tipus ''Eval a'' per seqüenciar les operacions
instance Monad Eval where
 return x = Done x
 Done x >>= f = f x

-- elevem ''par'' i ''pseq'' al tipus de la mònada amb les funcions rpar i rseq
rseq, rpar :: a  Eval a
rseq x = x `pseq` return x
rpar x = x `par` return x

-- reescribint nfib
nfib :: Int  Int
nfib n | n <= 1 = 1
 | otherwise = runEval $ do
 x <- rpar (nfib (n-1))
 y <- rseq (nfib (n-2))
 return (x + y)
-----
-- ara, al tipus de ''rpar'' i ''rseq'' l'anomenarem "estratègia"

type Strategy a = a  Eval a

-- avalua amb l'estratègia
withStrategy :: Strategy a  a  a
withStrategy strat e = runEval (strat e)

-- using és withStrategy amb els param. canviats, per a ésser emprat com a op. infix
using == flip withStrategy

-- composició seqüencial d'estratègies
dot :: Strategy a  Strategy a  Strategy a
-- strat2 `dot` strat1 == withStrategy strat1 >>> strat2

Estratègies típiques:[42]

r0 -- estratègia buida (no avalua res), elem. neutre de la combinació ''dot''
rseq -- avalua a WHNF (Weak Head Normal Form)
rdeepseq -- avalua a Forma Normal
rpar -- encua l'avaluació per l'execució en paral·lel (''spark'')
----
evalList :: Strategy a  Strategy [a] -- estratègia per aplicar a una llista l'estratègia del primer arg.
parList :: Strategy a  Strategy [a] -- estratègia per aplicar a una llista, paral·lelitzant, l'estratègia del primer arg.
parMap :: Strategy b -> (a -> b) -> [a] -> [b]
  • parMap: combinador típic, (parMap estrat f llista) aplica (map f) a llista i avalua la llista resultant amb l'estratègia (parList estrat).[45]
Compte! parMap no s'ha d'utilitzar amb l'estratègia rpar, ja que activaria l'execució paral·lela doblement per cada element
parMap estrat f = withStrategy (parList estrat). map f

Càlcul típic amb estratègies.

{-# LANGUAGE PackageImports #-}
import "parallel" Control.Parallel.Strategies (parMap, rpar, rseq)
import Control.Category ((>>>)) --- f >>> g = g. f

-- parMap :: Strategy b → (a → b) → [a] → [b]
-- parMap strat f = map f >>> withStrategy (parList strat)

càlcul x = 2 * sqrt x

mapejaEnParal·lel = parMap rseq -- avalua en paral·lel a WHNF (per rseq)

resultatsDelCàlculEnParal·lel = mapejaEnParal·lel càlcul (llista 2)
 where llista n = [n..1100000] :: [Double]

main = print $ sum resultatsDelCàlculEnParal·lel

compilació i exec.

# cal compilar amb "-rtsopts" per poder afegir opcions del RunTimeSystem en temps d'execució
ghc --make -threaded -rtsopts prova.hs

# opcions: estadístiques: -s,
# utilitza tots els nuclis de CPU: -N,
# pila de 100MB: -K100m
./prova +RTS -K100m -s -N

Exemple: #Reducció en paral·lel mitjançant estratègies.

Paral·lelisme de dades

[modifica]

Tractament paral·lel de dades basat en biblioteques ad-hoc.

  • procés paral·lel de vectors pluridimensionals
  • "Data parallel haskell"
  • Ús de les instruccions vectoritzades SIMD (Single Instruction Multiple Data) de les CPU.
  • Paral·lelisme GPGPU

Vegeu Compilador Haskell de Glasgow#Paral·lelisme de dades

Exemples de concurrència

[modifica]

Concurrència simple amb MVars - Productor-consumidor

[modifica]

Amb variables de sincronització per baldes (blocants) MVar.[46]

forkIO
Engega fil d'execució lleuger del planificador del Haskell en multiprocés cooperatiu.
putMVar mvar valor
bloca el fil d'execució si la variable MVar és plena (ocupada) fins que estigui disponible (buida); llavors l'omple amb el valor i continua.
takeMVar mvar
bloca el fil d'execució si la variable MVar és buida fins que la li omplin; llavors en recupera el valor, la deixa buida i continua.
module Main(main) where

import Control.Concurrent (forkIO, threadDelay, MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (finally)
import Control.Monad (forM_, when)
import System.IO (stdout, hFlush)
import Text.Printf (printf)
import Data.Time.Clock (getCurrentTime)
import Data.Time.LocalTime (utcToLocalZonedTime)
import Data.Time.Format (FormatTime, formatTime, defaultTimeLocale)

obtenir_hora :: IO String
obtenir_hora = do
 local_t <- getCurrentTime >>= utcToLocalZonedTime
 return $ formatTime defaultTimeLocale "%T" local_t

productor :: MVar Int -> IO ()
productor mv_bústia = do
 forM_ [3,2..0] $ \compte_enrere -> do -- per als valors de la llista
 threadDelay 1000000 -- espera microsegons
 putMVar mv_bústia compte_enrere -- espera disponibilitat i posa valor a la MVar

consumidor :: MVar Int -> IO ()
consumidor mv_bústia = do
 x <- takeMVar mv_bústia -- espera la MVar, se n'apropia el contingut i el retorna
 hora <- obtenir_hora

 printf "%s - consumidor: recollit %d\n" hora x
 hFlush stdout
 when (x > 0) $ consumidor mv_bústia -- tornem-hi, si no és el darrer valor de x

main = do
 -- nova MVar per la sicronització productor / consumidor
 mv_bústia <- newEmptyMVar :: IO (MVar Int)

 -- noves MVar per la sincronització de finalització de fil d'exec. com a semàfors binaris
 mv_fi_prod <- newEmptyMVar :: IO (MVar Bool)
 mv_fi_consum <- newEmptyMVar :: IO (MVar Bool)

 -- forkIO: engega fil d'execució
 consumidor_id <- forkIO $ consumidor mv_bústia `finally`
 putMVar mv_fi_consum True -- assenyala l'acabament a la MVar
 -- despertant el primer dels fils blocats per la mateixa

 productor_id <- forkIO $ productor mv_bústia `finally`
 putMVar mv_fi_prod True -- assenyala l'acabament a la MVar

 -- emulem amb MVar's la feina de ''pthread_join()'' de l'Unix
 -- per esperar la finalització dels fils d'exec. creats

 takeMVar mv_fi_prod -- espera la fi del productor (MVar com a semàfor binari)
 takeMVar mv_fi_consum -- espera la fi del consumidor
 putStrLn "fi del programa"

produeix la sortida següent:

11:32:03 - consumidor: recollit 3
11:32:04 - consumidor: recollit 2
11:32:05 - consumidor: recollit 1
11:32:06 - consumidor: recollit 0
fi del programa

Client-servidor - Canals amb cues d'entrada (Chan)

[modifica]
Client-servidor, canalitzant la impressió

Canals no acotats (en la dimensió de la cua) (Control.Concurrent.Chan)"[47]de primera classe" (és a dir, que es pot passar com a paràmetre)[48]

  • forkIO: Engega fil d'execució lleuger del planificador del Haskell.
  • forkOS: engega un fil lligat a un del sistema operatiu, per al cas de voler utilitzar característiques dels fils d'exec. del sistema, com ara crides FFI a biblioteques del sistema o bé allotjament lligat al fil.[12]
  • comunic. per cues il·limitades
writeChan canal
afegeix a la cua il·limitada i retorna tot seguit (sense blocar) (comunicació asíncrona).
readChan canal
bloca si la cua del canal és buida
  • resposta per MVars
  • semàfors:
newQSem valorInicial
nou semàfor
signalQSem semàfor
incrementa semàfor i assenyala
waitQSem semàfor
si semàfor > 0 llavors decrementa i continua, sinó espera
  • A l'exemple el client encua al canal torn, el parell (comanda, ref. resposta (mv_resposta)), i queda a l'espera de la resposta.
module Main(main) where

import Control.Concurrent (forkIO, forkOS, threadDelay,
 MVar, newEmptyMVar, putMVar, takeMVar, tryPutMVar)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Concurrent.QSem (newQSem, signalQSem, waitQSem)

import Control.Exception (finally)
import Data.IORef (IORef, newIORef, readIORef, writeIORef, modifyIORef)
import Control.Monad (forM_, when)
import System.IO (stdout, hFlush)
import Text.Printf (printf)
import Data.Time.Clock (getCurrentTime)
import Data.Time.LocalTime (utcToLocalZonedTime)
import Data.Time.Format (FormatTime, formatTime, defaultTimeLocale)
import Data.Function ((&)) -- (&): aplicació cap enrere

data TInfo = InfoDelClient Int | InfoDelServidor String Int | InfoPlega -- missatges a l'informador

type Canal_Comanda = Chan (Int, MVar_Resposta)
type MVar_Resposta = MVar Int
type Canal_Info = Chan TInfo

obtenir_hora :: IO String -- formulació alternativa amb 'fmap':
obtenir_hora = getCurrentTime >>= utcToLocalZonedTime -- (>>=) i (&) tenen la mateixa precedència i assoc.: infixl 1
 & fmap (formatTime defaultTimeLocale "%T")

client :: Canal_Comanda -> Canal_Info -> IO ()
client chan_torn chan_info = do
 forM_ [3,2..0] $ \cnt -> do -- llista de valors a passar, finalitzant en zero
 threadDelay 1000000 -- espera microsegons
 writeChan chan_info (InfoDelClient cnt) -- no bloca (asíncron, encua al canal i continua)

 mv_resposta <- newEmptyMVar :: IO (MVar Int)
 -- encua la comanda passant la ref. de la mvar de resposta.
 writeChan chan_torn (cnt, mv_resposta)
 takeMVar mv_resposta -- bloca (espera resposta per continuar)

obtenir_resposta :: Int -> IORef Int -> IO Int
obtenir_resposta x ref_estat = readIORef ref_estat >>= (\s -> return (s+x)) -- la que vulgueu

servidor :: Canal_Comanda -> Canal_Info -> IORef Int -> IO ()
servidor chan_torn chan_info ref_estat = do

 (x, mv_resposta) <- readChan chan_torn -- bloca fins obtenir comanda al chan_torn
 hora <- obtenir_hora
 resp <- obtenir_resposta x ref_estat

 tryPutMVar mv_resposta resp -- respon si és possible

 writeChan chan_info (InfoDelServidor hora x) -- no bloca (asíncron, encua al canal i continua)
 -- mentre rebem x > 0 fem bucle, altrament acaba.
 when (x > 0) $ servidor chan_torn chan_info ref_estat -- tornem-hi, si no tenim el darrer valor de x

-- informador: gestiona sortides a ''stdout'' en un sol fil d'execució
informador :: Canal_Info -> IO ()
informador chan_info = do
 info <- readChan chan_info -- bloca si la cua és buida
 case info of
 InfoDelClient intValor -> do
 printf "client: comanda %d\n" intValor
 hFlush stdout
 informador chan_info -- tornem-hi

 InfoDelServidor strHora intValor -> do
 printf "%s - servidor: recollit %d\n" strHora intValor
 hFlush stdout
 informador chan_info -- tornem-hi

 InfoPlega -> return () -- acaba

main = do
 ref_estat <- newIORef 0 :: IO (IORef Int) -- ref. no sincronitzada (la manipula un sol fil)
 chan_torn <- newChan :: IO (Canal_Comanda) -- cua de comandes al servidor

 chan_informacio <- newChan :: IO (Canal_Info) -- cua d'impressió

 -- semàfors per a l'espera d'acabament dels fils d'execució
 semàfor <- newQSem 0 -- semàfor per a client i servidor
 mv_fi_info <- newEmptyMVar :: IO (MVar Bool) -- semàfor amb MVar per a l'informador

 -- malgrat que la gestió de ''stdout'' pel fil d'exec. de l'informador
 -- no requereix ''bound threads'', li poso el forkOS per trencar el tabú.

 informador_id <- forkOS {- per les crides externes -} $ informador chan_informacio
 `finally` putMVar mv_fi_info True

 servidor_id <- forkIO $ servidor chan_torn chan_informacio ref_estat
 `finally` signalQSem semàfor -- incrementa semàfor i assenyala
 client_id <- forkIO $ client chan_torn chan_informacio
 `finally` signalQSem semàfor -- incrementa semàfor i assenyala

 -- espera finalització dels dos processos, el client i el servidor
 waitQSem semàfor -- espera fins que semàfor > 0, llavors decrementa i continua
 waitQSem semàfor -- espera fins que semàfor > 0, llavors decrementa i continua

 writeChan chan_informacio InfoPlega
 takeMVar mv_fi_info -- espera fi informador

 putStrLn "fi del programa"

dona la següent sortida:

client: comanda 3
15:18:55 - servidor: recollit 3
client: comanda 2
15:18:56 - servidor: recollit 2
client: comanda 1
15:18:57 - servidor: recollit 1
client: comanda 0
15:18:58 - servidor: recollit 0
fi del programa

Concurrència condicionada amb variables transaccionals (TVar) - Mònada STM (transaccions de memòria per programari)

[modifica]

Només al compilador GHC.[49] Les transaccions de memòria eviten blocar els fils d'execució, descartant canvis a les variables transaccionals si no es completa, excepte en cas que hi posem condicions forçant el reintent amb la clàusula "retry".

L'evolució de la transacció es modela com a aplicacions en una Mònada STM, inicials de "Software Transactional Memory".

En aquest exemple les transaccions[50] s'efectuen sobre variables transaccionals[51] TVar (accés sincronitzat per STM) i s'encapsulen en una mònada STM.

També substituïm les MVar (comunic. síncrona) per TMVar, i les Chan (comunic. asíncrona) per TChan, per quedar lliures de problemes de bloquejos.[52]

atomically
avalua una expressió de la mònada STM, admetent o tot o no res dels canvis a les variables transaccionals, oferint el resultat com a efecte global (mònada IO).
retry
provoca el reintent si no es donen les condicions esperades i reintenta una transacció alternativa per la branca "orElse" si existeix, i si no, bloca el fil d'execució fins que es modifiqui alguna de les variables transaccionals implicades en la transacció.
orElse
introdueix una transacció alternativa, que s'avalua si la precedent fa "retry"
always
comprova invariant i si falla, dispara un error "Transactional invariant violation" finalitzant el programa
catchSTM
atrapa excepcions dins la mònada STM

A partir de GHC 6.12, STM desapareix de la biblioteca pral. i, si no s'ha fet la instal·lació amb la Plataforma Haskell que l'incorpora, caldrà carregar el paquet stm del Hackage.[53]

  • La concurrència a l'exemple utilitza la biblioteca Async:[54]
withAsync acció_io $ \ async -> altres accions al fil d'exec. original
llença l'acció_io en un altre fil d'execució, asíncronament i continua amb les altres accions
wait async
espera finalització del fil d'exec. llançat asíncronament, identificat per async i en retorna el resultat
{-| transaccions als comptes; fitxer stm_part1.hs -}
{-# LANGUAGE PackageImports #-}

module Stm_part1 (aporta_quan_cal_i_obtenir_saldo,
 carrega_en_un_o_altre_compte
) where

import "stm" Control.Monad.STM (STM, retry, orElse, always)
import "stm" Control.Concurrent.STM.TVar (TVar, readTVar, writeTVar)
import Control.Monad (when)

saldo_baix = 4

aporta_quan_saldo_baix :: TVar Int -> Int -> STM ()
aporta_quan_saldo_baix tv_compte aportacio = do

 saldo <- readTVar tv_compte
 when (saldo > saldo_baix) retry -- bloca mentre no calgui afegir-hi diners,
 -- fins que es modifiqui alguna TVar, llavors reintenta
 let nou_saldo = saldo + aportacio
 writeTVar tv_compte nou_saldo -- executa l'aportació

invariant :: TVar Int -> STM Bool
invariant tv_compte = do
 saldo <- readTVar tv_compte
 return $ saldo >= 0

retira_fons :: TVar Int -> Int -> STM ()
retira_fons tv_compte quantitat = do

 saldo <- readTVar tv_compte
 when (saldo < quantitat) retry -- si no hi ha saldo reintenta la transacció alternativa
 -- o bloca i torna a la inicial si no hi ha més alternatives
 writeTVar tv_compte $ saldo - quantitat
 always $ invariant tv_compte -- comprova invariant de la transacció

carrega_en_un_o_altre_compte :: TVar Int -> TVar Int -> Int -> STM Int
carrega_en_un_o_altre_compte tv_compteA tv_compteB quantitat = do

 retira_fons tv_compteA quantitat `orElse` -- alternativa de transacció
 retira_fons tv_compteB quantitat

 saldoA <- readTVar tv_compteA
 saldoB <- readTVar tv_compteB
 return $ saldoA + saldoB

aporta_quan_cal_i_obtenir_saldo :: TVar Int -> TVar Int -> Int -> STM Int
aporta_quan_cal_i_obtenir_saldo tv_compteA tv_compteB quantitat = do

 aporta_quan_saldo_baix tv_compteB quantitat
 saldoA <- readTVar tv_compteA
 saldoB <- readTVar tv_compteB
 return $ saldoA + saldoB

Principal engegant fils d'execució per a creditor, deutor i informador (que gestiona stdout).

{-| fitxer stm_main.hs -}
{-# LANGUAGE PackageImports, ScopedTypeVariables #-}
module Main(main) where

-- import Prelude hiding (catch) -- obsolet, 'catch' ja no és al Prelude
import Stm_part1

import "async" Control.Concurrent.Async (withAsync, wait, asyncThreadId) -- futurs

import Control.Concurrent (threadDelay, killThread)

import "stm" Control.Monad.STM (STM, atomically)
import "stm" Control.Concurrent.STM.TVar (TVar, newTVar)
-- import "stm" Control.Concurrent.STM.TMVar (TMVar, newEmptyTMVarIO, putTMVar, takeTMVar) -- versió antiga
import "stm" Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan)

import Control.Exception (SomeException, catch, finally, mask_)
import Control.Monad (forM_, forever)
import System.IO (stdout, hFlush)
import Text.Printf (printf)

default (Int, Double) -- seq. de tipus per resoldre ambigüitats dels literals numèrics

data TInfo = InfoDelCreditor Int Int | InfoDelDeutor Int | InfoPlega -- missatges a l'informador

pagament = 3

-- creditor passa rebuts al cobrament de manera periòdica

creditor :: TVar Int -> TVar Int -> TChan TInfo -> IO ()
creditor tv_compteA tv_compteB tchan_informacio = do

 forM_ [1..6] $ \període -> do -- per als períodes de la llista
 threadDelay 1000000 -- espera final del període (en microsegons)
 -- carrega en un o altre compte
 saldo_conjunt <- atomically $ carrega_en_un_o_altre_compte tv_compteA tv_compteB pagament
 -- informa del cobrament
 atomically $ writeTChan tchan_informacio $ InfoDelCreditor període saldo_conjunt

-- deutor aporta diners al compte, quan el saldo baixa per sota d'un valor ''saldo_baix''

deutor :: TVar Int -> TVar Int -> TChan TInfo -> IO ()
deutor tv_compteA tv_compteB tchan_informacio = do

 (forever $ do
 -- mask_ emmascara interrupcions asíncrones (externes) (ex. killThread que llançarem des del fil principal)
 mask_ $ do -- amb mask_, el bloc no serà interromput
 saldo_conjunt <- atomically $ aporta_quan_cal_i_obtenir_saldo tv_compteA tv_compteB pagament
 -- informa de la nova aportació
 atomically $ writeTChan tchan_informacio $ InfoDelDeutor saldo_conjunt
)
 `catch` (\(_excep :: SomeException) -> return ()) -- cas d'excepció asíncrona "killThread" del fil principal, acaba

-- informador: gestiona sortides a ''stdout'' en un sol fil d'execució
-- vehicula missatges a imprimir a través del canal transaccional TChan (versió transac. de Chan)

informador :: TChan TInfo -> IO ()
informador tchan_informacio = do
 info <- atomically $ readTChan tchan_informacio -- bloca mentre canal buit
 case info of
 InfoDelCreditor període saldo -> do
 printf "creditor: període %d, saldo %d\n" període saldo
 hFlush stdout
 informador tchan_informacio -- tornem-hi

 InfoDelDeutor saldo -> do
 printf "deutor: saldo %d\n" saldo
 hFlush stdout
 informador tchan_informacio -- tornem-hi

 InfoPlega -> return ()

main = do
 tv_compteA <- atomically $ newTVar 10 -- compte A
 tv_compteB <- atomically $ newTVar 4 -- compte B

 tchan_informacio <- atomically (newTChan :: STM (TChan TInfo)) -- canal per a la informació a imprimir

 -- llança "informador" en un altre fil d'execució, i executa el bloc "do" al fil principal
 withAsync (informador tchan_informacio) $ \asyncInformador -> do 

 withAsync (deutor tv_compteA tv_compteB tchan_informacio) $ \asyncDeutor -> do

 withAsync (creditor tv_compteA tv_compteB tchan_informacio) $ \asyncCreditor -> do

 wait asyncCreditor -- espera que acabi el fil del creditor

 killThread (asyncThreadId asyncDeutor) -- genera excepció asíncrona al fil de asyncDeutor
 wait asyncDeutor -- espera que acabi el fil del deutor

 atomically $ writeTChan tchan_informacio InfoPlega -- afegeix ordre de plegar al canal de l'informador

 wait asyncInformador -- espera que acabi el fil de l'informador

 putStrLn "fi del programa"

Compilació i exec.

ghc --make -threaded stm_part1.hs stm_main.hs -o stm_main
./stm_main

Exemples de paral·lelisme

[modifica]

Reducció en paral·lel mitjançant estratègies

[modifica]
{-# LANGUAGE PackageImports #-}
import "parallel" Control.Parallel.Strategies
import "HUnit" Test.HUnit 
import Control.Exception as CE

-- de la definició: type Strategy a = a -> Eval a

estratSumaSegment :: Num a => Strategy [a]
-- equival a
-- estratSumaSegment :: Num a => [a] -> Eval [a]

estratSumaSegment llista = rseq [sum llista] -- avalua la suma d'un segment en un fil embolcallant el resultat en el mateix tipus de contenidor que l'origen

-- parteix llista en segments de n elements
segments :: Int -> [a] -> [[a]]
segments _ [] = [[]]
segments n xs = if null zs then [ys]
 else ys : segments n zs -- recursivitat final diferida (''mòdulo cons'')
 where (ys, zs) = splitAt n xs -- parteix a l'índex n

sumaEnParal·lel :: Num a => Int -> [a] -> a
sumaEnParal·lel llargadaSegment llista = CE.assert (llargadaSegment > 1) -- precondició
 $ sum $ concat totalsSegments
 where
 totalsSegments = segments llargadaSegment llista
 `using` parList estratSumaSegment -- avalua, paral·lelitzant, cadascun dels segments

prova llista = TestCase (assertEqual msg esperat calculat)
 where
 calculat = sumaEnParal·lel 1000 llista
 esperat = sum llista
 msg = "sumaEnParal·lel en segments de 1000"

tests = TestList [TestLabel "test1" $ prova [1..10000]]

main = runTestTT tests

Referències

[modifica]
  1. «mòdul Control.Concurrent Vegeu Scheduling». Arxivat de l'original el 2012-08-02. [Consulta: 1r desembre 2011].
  2. 2,0 2,1 GHC's implementation of concurrency Arxivat 2012-08-02 at Archive.is(anglès)
  3. Control.Concurrent.MVar(anglès)
  4. Control.Concurrent.Chan(anglès)
  5. Control.Concurrent.STM.TVar(anglès)
  6. Control.Concurrent.STM.TMVar(anglès)
  7. Control.Concurrent.STM.TChan(anglès)
  8. 8,0 8,1 8,2 8,3 Variables MVar(anglès)
  9. Control.Concurrent.MVar.Strict(anglès)
  10. 10,0 10,1 10,2 El planificador del GHC - fils d'exec. del sistema i fils del GHC (anglès)
  11. «Concurrency – Haskell Prime». Arxivat de l'original el 2011-12-09. [Consulta: 1r desembre 2011].(anglès)
  12. 12,0 12,1 12,2 Bound threads (fils d'execució lligats als del sistema)(anglès)
  13. Concurrència i crides externes (FFI) al compilador GHC(anglès)
  14. Control.Concurrent.forkIO(anglès)
  15. Control.Concurrent.forkOS(anglès)
  16. Control.Concurrent.forkOn(anglès)
  17. Control.Concurrent.forkFinally(anglès)
  18. La mònada Par - presentació Arxivat 2013-08-23 a Wayback Machine. (anglès)
  19. La mònada Par(anglès)
  20. «Haskell Exchange 2012 - Simon Marlow - High performance concurrency». Arxivat de l'original el 2013-02-03. [Consulta: 18 gener 2013].
  21. El paquet Async
  22. mòdul Control.Concurrent.Async combinadors(anglès)
  23. mòdul Control.Concurrent.Async newtype Concurrently(anglès)
  24. CHP.pdf(anglès) Communicating Haskell Processes: Composable Explicit Concurrency Using Monads
  25. Univ. de Kent - Communicating Haskell Processes - pàgina inicial
  26. paquet chp
  27. Actors with Multi-Headed Receive Clauses (anglès)
  28. Cloud Haskell(anglès)
  29. A Cloud Haskell Appetiser(anglès) Un aperitiu de Cloud Haskell
  30. Vídeo: HIW 2012. Duncan Coutts: Cloud Haskell 2.0(anglès)
  31. Control.Concurrent.throwTo(anglès)
  32. safe-exceptions - README(anglès)
  33. GHC.Conc.Sync.setUncaughtExceptionHandler(anglès)
  34. The Unhandled Exception Handler Arxivat 2012-09-06 a Wayback Machine.(anglès)
  35. primitives de paral·lelisme(anglès)
  36. «Haskell Paral·lel». Arxivat de l'original el 2014-10-25. [Consulta: 18 agost 2017].
  37. GHC seq vs. pseq(anglès) Comparació de les primitives seq i pseq
  38. GHC - primitiva pseq(anglès)
  39. «GHC - Utilitzant multiprocés simètric SMP». Arxivat de l'original el 2014-09-27. [Consulta: 18 agost 2017].
  40. GHC - Hints (cat:Pistes) for using SMP parallelism Arxivat 2014-09-27 a Wayback Machine. (anglès)
  41. 41,0 41,1 Glasgow Parallel Haskell (anglès) Estratègies de paral·lelisme
  42. 42,0 42,1 Control.Parallel.Strategies(anglès)
  43. Seq no more: Better Strategies for Parallel Haskell Arxivat 2017-08-13 a Wayback Machine.(anglès) Prou de seq (seqüencial) - millors estratègies per al Haskell paral·lel
  44. haskellWiki - paral·lelisme(anglès)
  45. parMap
  46. Variables de sicronització MVar's Arxivat 2010-04-17 a Wayback Machine. (anglès)
  47. Control.Concurrent.Chan Arxivat 2014-07-15 a Wayback Machine. (anglès)
  48. Communicating .. - First Class Channels Arxivat 2010-07-06 a Wayback Machine.(anglès)
  49. API de concurrència del compilador GHC Arxivat 2011-11-19 a Wayback Machine. (anglès)
  50. HaskellWiki - STM - Transaccions de memòria per software (anglès)
  51. Variables transaccionals
  52. Avaluació de models de programació multicor (PDF)[Enllaç no actiu](anglès)
  53. Què se n'ha fet de Control.Concurrent.STM(anglès)
  54. mòdul Control.Concurrent.Async(anglès)

Enllaços externs

[modifica]