Description
There are n number of bags which can be filled up with either large, medium or small items. The weight of items of a specific kind is fixed. The maximum weight carried by a bag is fixed.
Bags are filled up with items in a fixed order:
- 1. In sequential order from lowest to highest bag number
- 2. Exhaustively fill up the bag first with
- (a) large items, then
- (b) medium items, then
- (c) small items.
- (a) large items, then
The straightforward solution is to
- 1. Fill up bags with large items, then
- 2. with medium items, and then finally with
- 3. small items.
However, once a bag has been exhaustively filled up with large items (we either stop because the bag reached its maximum capacity of large items, or we're simply running out of large items), we can start filling up this bag with medium items. The point is that we don't have to wait until all bags are exhaustively filled up with large items.
This form of concurrency has clear connection to instruction pipelining. What we want to show here is how to come up with a concurrent solution to the bagger problem
We'll give two solutions:
- a solution using join-patterns extended with guards,
- from which we can easily derive a more 'elementary' solution using standard synchronization primitives such as Software Transactional Memory (STM).
Solution using Join extended with guards
The join-calculus (and languages implementing join-patterns) doesn't support guards. Hence, we make use of our join extension which supports guards. The guards we use here are in fact implicit (shared variables). Here we go:
The methods we introduce are:
Bag(kind,no,content) denotes a bag which
- currently expects to be filled with items of kind 'kind'
- no is the bags number
- content refers to the current content
Initially, all bags are of the form
Bag(Large,i,Empty)
Item(k) denotes an item of kind 'kind'
The above methods are all asynchronous.
We introduce two synchronous methods.
Iterator(kind,no) fills up bag number no with items of kind 'kind'.
Initially, there are three 'iterators':
- Iterator(Large,1)
- Iterator(Medium,1)
- Iterator(Small,1)
Each will run in its own thread. Each starts in 'Large' mode.
That is, only the large iterator can fill up the bags (sequentially).
Once the large iterator is done with bag number i, the bag is
'unlocked' by moving from 'Large' to 'Medium' mode and so on.
GetItem(kind,r) looks for an item of kind 'kind', the outcome
is reported in r of type Maybe Weight.
Here are the join patterns and their bodies.
(We assume that join patterns are executed from top to bottom,
for convenience only)
Bag(kind,no,content) & Iterator(kind,no) =
if (weight content) + (weight kind) >= maxBagWeight
then do bag (next kind) no content
if no < MaxBagNo
then iterator kind (no+1)
else return ()
else do
r <- getItem k
case r of
Nothing -> do
bag (next kind) no content
if no < MaxBagNo -- (***)
then iterator kind (no+1)
else return ()
Just itemWeight -> do
bag kind no (add content itemWeight)
iterator kind no
Item(k) & GetItem(k,x) = case k of
Large -> x := Just Large
...
GetItem(k,x) = x:= Nothing
NOTE:
First, I thought we could replace (***) by the 'default' rule
Iterator(kind, no) = return ()
which we put *after* the first rule.
However, this won't work because the 'small' iterator waiting for
the medium and large iterator may terminate prematurely.
A concrete implementation of the above can be found
here as part of the multisetrewrite library.
Elementary solution in Haskell using STM
> import IO
> import Data.IORef
> import Control.Concurrent
> import Control.Concurrent.STM
> import Control.Concurrent.Chan
We set up the item space by storing large, medium and small
items into their respective list. For each kind of items,
there's at most one thread trying to access the items.
Hence, using IORefs is fine.
> data ItemSpace = IS { large :: IORef [()]
> , medium :: IORef [()]
> , small :: IORef [()] }
We only store () values, the weights are fixed.
> largeWeight = 6
> mediumWeight = 4
> smallWeight = 2
The kind of items we support are either large, medium or small.
Done is a "dummy" item. We'll move from large to medium to small items.
Done simply indicates that we are done.
> data Item = Large | Medium | Small | Done deriving (Eq, Show)
> next :: Item -> Item
> next Large = Medium
> next Medium = Small
> next Small = Done
> itemWeight :: Item -> Int
> itemWeight Large = largeWeight
> itemWeight Medium = mediumWeight
> itemWeight Small = smallWeight
The procedure to fetch items. There may be concurrent threads
accessing either large, medium or small items. But there's
at most one thread for each kind.
> fetch :: ItemSpace -> Item -> IO (Maybe ())
> fetch is kind =
> let get m = do
> l <- readIORef m
> case l of
> (x:xs) -> do writeIORef m xs
> return (Just x)
> [] -> return Nothing
> in case kind of
> Large -> get (large is)
> Medium -> get (medium is)
> Small -> get (small is)
Each bag records its current weight and content.
The mode tells us which kind of items are currently allowed to
be put into the bag. The mode is protected by a TVar which allows us
to suspend until the large items are exhaustively put into the bag.
> data Bag = Bag { mode :: TVar Item
> , curWeight :: IORef Int
> , content :: IORef [Item] }
> printBag :: Bag -> IO ()
> printBag bag = do
> w <- readIORef (curWeight bag)
> c <- readIORef (content bag)
> putStrLn $ (show (w,c))
The maximum weight held by each bag is fixed.
> maxBagWeight = 16
We add an item to a bag by incrementing the bags weight
plus updating the content.
> add :: Bag -> Item -> IO ()
> add bag kind = do
> w <- readIORef (curWeight bag)
> c <- readIORef (content bag)
> writeIORef (curWeight bag) (w + itemWeight kind)
> writeIORef (content bag) (kind : c)
We check if a bag becomes full (over-weight) if we would add
another item.
> bagIsFull :: Bag -> Item -> IO (Bool)
> bagIsFull bag kind = do
> w <- readIORef (curWeight bag)
> return ((w + itemWeight kind) > maxBagWeight)
fillBag fills up (exhaustively) a bag with items of a given kind.
We 'unlock' the bag by 'incrementing' the mode if
- the bag is full
- no more items of this kind are left
> incrementMode :: Bag -> IO ()
> incrementMode bag =
> atomically $ do status <- readTVar (mode bag)
> writeTVar (mode bag) (next status)
> fillBag :: ItemSpace -> Bag -> Item -> IO ()
> fillBag is bag Done = error "not happening here"
> fillBag is bag kind =
> let loop = do
> b <- bagIsFull bag kind
> if b then incrementMode bag
> else do
> r <- fetch is kind
> case r of
> Nothing -> incrementMode bag
> Just _ -> do add bag kind
> loop
> in loop
We process (ie fill up) the list of bags sequentially.
For each bag we must wait untill its our turn.
The priorities are
large > medium > small
> waitMode :: Bag -> Item -> IO ()
> waitMode bag kind =
> atomically $ do status <- readTVar (mode bag)
> if status == kind then return ()
> else retry
> processBags :: ItemSpace -> Item -> [Bag] -> IO ()
> processBags is kind bs =
> mapM_ (\b -> do waitMode b kind
> fillBag is b kind) bs
For each kind we spawn a thread to fill up the list of bags.
> start :: ItemSpace -> [Bag] -> IO ()
> start is bs = do
> cnt <- atomically $ newTVar 0
> let spawn = \kind -> forkIO (do processBags is kind bs
> atomically $ do v <- readTVar cnt
> writeTVar cnt (v+1))
> spawn Large
> spawn Medium
> spawn Small
> atomically $ do v <- readTVar cnt
> if v == 3 then return ()
> else retry
testing
> createIS ln mn sn = do
> l <- newIORef [ () | x <- [1..ln]]
> m <- newIORef [ () | x <- [1..mn]]
> s <- newIORef [ () | x <- [1..sn]]
> return (IS {large = l, medium = m, small = s})
> emptyBag = do
> m <- atomically $ newTVar Large
> w <- newIORef 0
> c <- newIORef []
> return Bag {mode = m, curWeight = w, content = c}
> test largeNo medNo smallNo bagNo = do
> is <- createIS largeNo medNo smallNo
> bs <- mapM (\_ -> emptyBag) [1..bagNo]
> mapM_ (\b -> printBag b) bs
> putStrLn "Start"
> start is bs
> putStrLn "Done"
> mapM_ (\b -> printBag b) bs
Some observations
The STM solution can be improved. The thread filling up bags with large items doesn't need to check the status of the bag and the thread filling up small items only needs to be waken up once the 'medium' thread is done. We could achieve this behavior by replacing
mode :: TVar Item
with
mode1 :: TVar Item
mode2 :: TVar Item
That is, the 'medium' thread retries on mode1 and the 'small' thread retries on mode2.
In the join solution none of these optimizations are necessary, they are dealt with implicitly. For example, if Iterator(Small,someNo) is unable to fire a join-pattern, this method will be suspended (i.e. the method is put into the store). If the mode of a bag finally switches to small, this active bag will then in combination with the inactive (ie suspended) Iterator(Small,someNo) trigger the join-pattern.
No comments:
Post a Comment