package RPush(RPush(..), apply, tee, pass, passed, buffer, buffered, pipe, (»), sink, spew, fifoToRPush) where import FIFO --@ \subsubsection{RPush} --@ --@ \index{RPush@\te{RPush} (interface type)|textbf} --@ The {\mbox{\te{RPush a}}} interface represents a stream that --@ consumes values of type ``\te{a}'' only when ``pushed'' by a producer. --@ The stream can also be ``cleared'' (or reset), forgetting all buffered state (which is --@ what distinguishes it from the \te{Push} interface.) --@ --@ Modules with the RPush interface can be combined using the function --@ \te{pipe}, --@ to model computations that comprise several steps, --@ each of which may be buffered. --@ infixr 12 » --@ \begin{libverbatim} --@ interface RPush #(type a); --@ method Action push(a x1); --@ method Action clear(); --@ endinterface: RPush --@ \end{libverbatim} interface RPush a = push :: a -> Action clear :: Action --@ Apply a function to the data in the stream. --@ \begin{libverbatim} --@ function RPush#(a) apply(function b f(a x1), RPush#(b) dst); --@ \end{libverbatim} apply :: (a -> b) -> RPush b -> RPush a apply f dst = interface RPush { push x = dst.push (f x); clear = dst.clear } --@ Allow an action to peek at the --@ stream.\footnote{Why \te{tee}? Run \mbox{\te{man tee}}.} --@ \begin{libverbatim} --@ function RPush#(a) tee(function Action a(a x1), RPush#(a) dst); --@ \end{libverbatim} tee :: (a -> Action) -> RPush a -> RPush a tee a dst = interface RPush { push x = action { a x; dst.push x }; clear = dst.clear } --@ Wrap the stream in a module (without buffering). --@ \begin{libverbatim} --@ module pass#(RPush#(a) dst)(RPush#(a)); --@ \end{libverbatim} pass :: (IsModule m c) => RPush a -> m (RPush a) pass dst = module interface push x = dst.push x clear = dst.clear --@ Apply a function to the data in the stream --@ and wrap the stream in a module (without buffering). --@ \begin{libverbatim} --@ module passed#(function b f(a x1))(RPush#(a)); --@ \end{libverbatim} passed :: (IsModule m c) => (a -> b) -> RPush b -> m (RPush a) passed f = pass · apply f --@ Wrap a stream in a module --@ (with a FIFO buffer). --@ \begin{libverbatim} --@ module buffer#(RPush#(a) dst)(RPush#(a)) --@ provisos (Bits#(a, sa)); --@ \end{libverbatim} buffer :: (IsModule m c, Bits a sa) => RPush a -> m (RPush a) buffer dst = module q :: FIFO a <- mkFIFO rules "push": when True ==> action { dst.push q.first; q.deq } interface push x = q.enq x clear = action { q.clear; dst.clear } --@ Apply a function to the data in the stream --@ and wrap the stream in a module --@ (with a FIFO buffer). --@ \begin{libverbatim} --@ module buffered#(function b f(a x1))(RPush#(a)) --@ provisos (Bits#(a, sa)); --@ \end{libverbatim} buffered :: (IsModule m c, Bits a sa) => (a -> b) -> RPush b -> m (RPush a) buffered f = buffer · apply f --@ A consumer that drops all data. --@ \begin{libverbatim} --@ module sink(RPush#(a)); --@ \end{libverbatim} sink :: (IsModule m c) => m (RPush a) sink = module interface push _ = action {} clear = action {} --@ A producer that always pushes junk on the given stream. --@ \begin{libverbatim} --@ module spew#(RPush#(a) dst)(Empty); --@ \end{libverbatim} spew :: (IsModule m c) => RPush a -> m Empty spew dst = module rules "spew": when True ==> dst.push _ --@ Combine two streams (e.g., \te{pipe(spew, sink)}). --@ \begin{libverbatim} --@ function m#(b) pipe (function m#(b) f(a x1), m#(a) a) --@ provisos (Monad#(m)); --@ \end{libverbatim} (») :: (Monad m) => (a -> m b) -> m a -> m b f » a = a `bind` f pipe :: (Monad m) => (a -> m b) -> m a -> m b pipe f a = (f » a) --@ Wrap a \te{RPush} interface around a FIFO. --@ \begin{libverbatim} --@ function RPush#(a) fifoToRPush(FIFO#(a) q); --@ \end{libverbatim} fifoToRPush :: FIFO a -> RPush a fifoToRPush q = interface RPush push x = q.enq x clear = q.clear