data MChan a
data port a
newMChan :: STM (MChan a)
-- Write an item to the channel:
writeMChan :: MChan a -> a -> STM ()
-- Create a new read port:
newPort :: MChan a -> STM (Port a)
-- Read the next buffered item:
readPort :: Port a -> STM a
We represent the buffered data by a linked list, or Chain, of
items, with a transactional variable in each tail, so that it can
be extended by writeMChan:
type Chain a = TVar (Item a)
data Item a = Empty | Full a (Chain a)
An MChan is represented by a mutable pointer to the “write”
end of the chain, while a Port points to the read end:
type MChan a = TVar (Chain a)
type Port a = TVar (Chain a)
With these definitions, the code writes itself:
newMChan = do {c <- newTVar Empty; newTVar c}
newPort mc = do {c <- readTVar mc; newTVar c}
readPort p
= do { c <- readTVar p
; i <- readTVar c
; case i of
Empty -> retry
Full v c’ -> do {writeTVar p c’;
return v}}
writeMChan mc v
= do { c <- readTVar mc
; c’ <- newTVar Empty
; writeTVar c (Full v c’)
; writeTVar mc c’}
Notice the use of retry to block readPort when the buffer is empty. Although this implementation is very simple, it
ensures that each item written into the MChan is delivered to
every Port; it allows multiple writers (their writes are interleaved); it allows multiple readers on each port (data read by
one is not seen by the other readers on that port); and when
a port is discarded, the garbage collector recovers the buffered data.
More complicated variants are simple to program. For example, suppose we wanted to ensure that the writer could
get no more than N items ahead of the most advanced reader. One way to do this would be for the writer to include a serially increasing Int in each Item, and have a shared TVar
holding the maximum serial number read so far by any reader. It is simple for the readers to keep this up to date, and for
the writer to consult it before adding another item.
4. 3. merge
We have already stressed that transactions are composable.
For example, to read from either of the two different multicast channels, we can say:
atomic (readPort p1 ‘orElse‘ readPort p2)
No changes need to be made to either multicast channel.
If neither port has any data, the STM machinery will cause
the thread to wait simultaneously on the TVars at the extremity of each channel.
Equally, the programmer can wait on a condition that involves a mixture of MVars and MChans (perhaps the multicast channel indicates ordinary data and an MVar is being
used to signal a termination request), for instance:
atomic (readPort p1 ‘orElse‘ takeMVar m1)
This example is contrived for brevity, but it shows how operations taken from different libraries, implemented without
anticipation of their being used together, can be composed.
In the most general case, we can select between values received from a number of different sources. Given a list of
computations of type STM a we can take the first value to be
produced from any of them by defining a merge operator:
merge :: [STM a] -> STM a
merge = foldr1 orElse
(The function foldr1 f simply reduces a list [a a . . . a ] to
12 n
the value a ‘f‘ a ‘f‘ . . . ‘f‘ a .) This example is childishly
12 n
simple in STM-Haskell. In contrast, a function of type
mergeI0 :: [I0 a] -> I0 a
is unimplementable in Concurrent Haskell, or indeed in
other settings with operations built from mutual exclusion
locks and condition variables.
5. imPLementation
Since our original paper there has been a lot of work on building fast implementations of STM along with hardware support to replace or accelerate them. 14 The techniques we have
used in STM-Haskell are broadly typical of much of this work
and so we do not go into the details here. In summary, however, while a transaction is running, it builds up a private log
that records the TVars it has accessed, the values it has read
from them and (in the case of writes) the new values that it
wants to store to them. When a transaction attempts to commit, it has to reconcile this log with the heap. Logically this
has two steps: validating the transaction to check that there