Базовая работа с MVar
В данном посте будет краткое описание одного из базовых примитивов для для передачи данным ‘MVar’ (Mutable Variable, эм-вар).
Для начала хочется дать очень кратное описание основных структур данных, позволяющих получать общий доступ к изменяемым переменным из разных потоков. Стандартная библиотека представляет три глобальные группы
- IORef - изменяемая переменная, над которой можно производить операции из разных потоков (поддерживает атомарные операции) все операции должны быть чистыми
- TVar - примитив для работы с транзакционной памятью, работа с ним возможна только внутри
STM
монады (поддерживает объединяемые транзакции) - MVar - низкоуровневый примитив при изменении, которого можно производить произвольные
IO
операции.
Исходя из написанного выше видно, что MVar
следует использовать тогда, когда при работе с переменной идёт и работа с ‘IO’, в этом случае другие механизмы не могут предоставить покрывающее данную задачу решение.
Краткий обзор
data MVar a
можно рассматривать как ящик защищенный мьютексом, в котором может или находиться или не находиться значение.
У MVar
переменных есть достаточно простой базовый API:
data MVar a -- abstract
newEmptyMVar :: IO (MVar a)
newMVar :: a -> IO (MVar a)
takeMVar :: MVar a -> IO a
putMVar :: MVar a -> a -> IO ()
Операции newEmptyMVar
и newMVar
создают новую переменную, пустую или со значением соотвественно. Операция takeMVar
получает текущее значение, оставляя MVar
пустым или блокируется в случае если MVar
пуст. Операция putMVar
наоборот кладёт значение в MVar
если он пуст или блокируется, если в MVar
уже есть значение.
Небольшой пример того, как может использоваться MVar
import Control.Concurrent
main = do
result <- newEmptyMVar -- (1)
forkIO $ do -- (2)
r <- getUrl "http://ruhaskell.org/" -- (3)
putMVar result r -- (4)
doSmthElse
r <- takeMVar result -- (5)
print r
В этом коде мы создали ячейку для результата (1)
, запустили фоновое действие (2)
, получили результат со страницы (3)
, положили результат в ячейку (4)
, в главном потоке после завершения действия получили данные из ячейки (5)
.
В случае если все процессы заблокированы на MVar
, система исполнения может автоматически отслеживать такие ситуации, и выбрасывать исключение в таком случае:
main = do
m <- newEmptyMVar
takeMVar m
$ ./mvar3
mvar3: thread blocked indefinitely in an MVar operation
Вкратце такое происходит, если система исполнения “видит”, что ни у одного незаблокированного потока нет в области видимости MVar
-переменной, запись или чтение из которой могут разблокировать систему. Однако, часто такое описание может вызвать затруднение, подробно разные случае рассмотрены в посте Edward Z. Yang BlockedIndefinitelyOnMVar.
Использование MVar
В целом MVar
могут быть применены в следующих случаях:
как “одноместные” каналы передачи данных; в этом случае
MVar
можно рассматривать как базовый примитив для передачи данныхкак контейнер для разделяемых изменяемых данных;
как базовый компонент для более сложных конкурентных структур данных.
Примеры задач:
- Возвращение результата из подпотока. (Пример с рабочим процессом выше)
Синхронизация между потоками, например ожидание завершения потоков
main = do finished <- newEmptyMVar forkFinally (const $ putMVar finished ()) run takeMVar tid
Многопоточный доступ к состоянию:
st <- newMVar State replicateM 3 $ forkIO $ forever $ do r <- request modifyMVar st (processRequest r)
Блокирока ресурса при многопоточном доступе:
lock <- newMVar () replicateM 3 $ forkIO $ do ... withMVar_ lock doScarryThings
- каналы Control.Concurrent.Chan
Более полные примеры можно найти в книге Simon Marlow Parallel and Concurrent Programming in Haskell Глава 7.
Для упрощения работы с MVar
можно разбить функции на две группы для работы со случаем (1)
и (2)
, третий случай попадает в одну из групп в зависимости от конкретной задачи MVar
в структуре. И далее в каждом из случаев использовать функции из этой группы:
Передача данных | Контейнер для разделенных данных |
---|---|
putMVar |
withMVar |
takeMVar |
modifyMVar |
isEmptyMVar |
readMVar |
tryReadMVar,tryPutMVar |
swapMVar |
Естестсвенно, никто не требует, чтобы при работе с одним из случаев не использовать функции из другой группы, но в этом случае необходимо четко понимать, к каким последствиям это может приводить и учитывать при написании кода. Вот несколько интересных примеров:
один поток вызывает
withMVar
и выполняет действие, после чего должен положить значение назад, но, если в это время другой поток “положит” результат вMVar
(putMVar
), то первый поток будет заблокирован.Аналогичная ситуация может происходить с вызовом
readMVar
, который есть комбинацияtakeMVar
иputMVar
, в ghc начиная с 7.8 данное поведение исправлено и теперь операцияreadMVar
не освобождает “ячейку”.при использовании
takeMVar
иputMVar
, в случае исключенияMVar
останется не заполненым. (Если конечно исключения не обрабатываются должным образом, см. ниже)
В случае работы как описано выше проблемные ситуации сводятся к минимуму.
Гарантия “справедливости”
MVar
таже имеют очень важное свойство “справедливость” (fairness), которое обозначает следующую гарантию:
Ни один поток не может заблокироваться на
MVar
навечно, за исключением случая, если другой поток вечно “держит”MVar
.
Другими словами, если поток τ вызывает takeMVar
и выполняются регулярные операции putMVar
на той же переменной, то takeMVar
когда-нибудь завершится. В GHC данная гарантия реализована при помощи хранения всех потоков заблокированных на MVar
в очереди (FIFO), и при осводождении MVar
выполнение пробуждения потока и выполнения операции над MVar
за один атомарный шаг.
Важным следствием гарантии справедливости является то, что достаточно будить только один поток, что уменьшает нагрузку. (Это несправедливо для случая STM
, где в при обновлении TVar
должны быть пробужены все заблокированные потоки).
Исключения и MVar
Особенно аккуратным при работе с MVar
нужно быть в случае возможных асинхронных исключений. Поскольку необходимо следить за тем, чтобы даже в при наличии исключений актуальность MVar
не терялась.
Функции для работы с MVar
как с контейнером “знают” об исключениях, и, в случае, если исключение появляется, возвращают состояние MVar
. Делается это достаточно просто, но является интересным и простым примером работы с асинхронными исключениями:
modifyMVar m f = mask $ \restore -> do -- (1)
x <- takeMVar -- (2)
(y, n) <- restore f `onException` putMVar x -- (3)
putMVar y -- (4)
return n -- (5)
Здесь мы (1)
- заблокировали исключения, и получили функцию для из разблокировки restore
; (2)
получили текущее значение MVar
; (3)
- разблокировав исключения запустили пользовательскую функцию, если вовремя выполнения пользовательской функции пришло исключение, то мы возвращаем значение MVar
и выбрасываем исключение выше; (4)
- если мы дожили до этого участка кода без исключений, то они больше не случатся, мы кладём новое значение и возвращаем результат (5)
. Достаточно просто, но это и простейший примитив :).
Но все ли тут хорошо? Внимательный читатель спросит, а как же можно остановить поток, который заблокирован на takeMVar
, если исключения замаскированы. Тут есть две части ответа: во-первых IndefinitelyBlockedOnMVar
исключение магическое (работает как синхронное) и потому игнорирует маскировку, а во-вторых подобные операции, выполняющие блокирующие операции, снимают маскировку. Т.о. при чтении MVar
, в случае если MVar
пуст (и тред блокируется), асинхронное исключение может быть доставлено, даже если исключения заблокированы. Аналогично с записью и не-пустым MVar
. Данное правило является не самым простым, но его понимание важно при написании кода лишенного гонок. Для случаев, когда такое поведение нежелательно существует операция uninterruptibleMask
, не позволяющая разблокировать исключения, эта оперция не несёт дополнительной сложности, но в этом случае разблокировать ветку будет невозможно (кроме IndefinitelyBlockedOnMVar
). Подробно про работу и исключениями можно прочитать в документации модуля Control.Exception.
Но все ли теперь хорошо? На самом деле нет, существует ещё одна возможная проблема! В случае, если состояние MVar
должно отражать состояние внешнего мира, а операция внутри MVar
его изменяет, возможна рассинхронизация. Представьте следующий пример:
newConnection addr =
modifyMVar state $ \state -> -- (1)
mc <- connect addr -- (2)
-- (BOOM!)
case ec of
Left e -> return (state, ConnectionError e) -- (3)
Rigth r -> return (r:state, OK) -- (4)
В этом коде мы захватываем состояние (1)
; открываем новое соединение (2)
; в случае неуспеха возвращаем ошибку, не изменяя состояние (3)
; в случае успеха сохраняем новое состояние и возращаем соотвествующий результат (4)
. Теперь представим, что мы получаем асинхронное исключение в момент (BOOM!)
, здесь соедиение уже открыто, но ещё не сохранено в состояние, после получения исключение в state
возвращается старое значение - ошибка.
Одно из решений использовать маскировку самому:
newConnection addr = mask $ \release ->
modifyMVar state $ \state -> -- (1)
mc <- release (connect addr) -- (2), (BOOM!)
case ec of
Left e -> return (state, ConnectionError e) -- (3)
Rigth r -> return (r:state, OK) -- (4)
или если connect
размаскирует исключения:
newConnection addr = mask_ $
modifyMVar state $ \state -> -- (1)
mc <- connect addr -- (2), (BOOM!)
case ec of
Left e -> return (state, ConnectionError e) -- (3)
Rigth r -> return (r:state, OK) -- (4)
Данный код будет работать корректно. Также можно использовать методы modifyMVarMasked
, который работает аналогично последнему блоку кода.
Следует заметить, что STM
не имеет большей части из этих проблем и существенно проще в использовании, но там имеются свои сложности с включением IO
операций в транзакции (или написания серии блоков STM -> IO -> STM
) и с отсутствием гарантии справедливости.