Link Search Menu Expand Document

Introduce BIO: A Simple Streaming Abstraction

2021-04-20 20:43:14 +0000 by Dong

流式 IO 是一个古老的想法:按块读取数据,对每个块进行处理并将其写入输出,以使程序使用的内存大小保持在较低的水平。例如:

cat foo.txt | gzip | base64 | tee foo.gz

在上面的UNIX命令中,按块读取文件foo.txt,执行 gzip 和 base64 转换,并通过管道传递到 foo.gzstdout 。使用 Haskell 处理分块数据时,我们希望获得类似的语法,这就是流抽象的起点。

流式代数数据类型

部分闭包

Z-Data 的解析器部分 中,我们描述了一个可恢复的解析器,该解析器可以分块消费输入:

> P.parse' dateParser "2020-12-12"
Date 2020 12 12
> P.parseChunk dateParser "2020-"
Partial _
> let (P.Partial f) = P.parseChunk dateParser "2020-"
> let (P.Partial f') = f "05-05"    -- incrementally provide input
> f' ""                             -- push empty chunk to signal EOF
Success Date {year = 2020, month = 5, day = 5}

实现可恢复解析的核心类是 Result

data Result e r
    = Success r !Bytes
    | Failure e !Bytes
    | Partial (V.Bytes -> Result e r)

Partial 构造函数包含一个捕获了上一个块的解析状态的闭包,可以将其应用于下一个块以产生新的结果。现在让我们考虑是否可以将这种构造应用于 IO (或任意 monad ),以下定义来自 streaming 包:

data Stream f m r = Step !(f (Stream f m r))
                  | Effect (m (Stream f m r))
                  | Return r

data Of a b = !a :> b

流单子

streaming 中,Stream (Of a) IO () 用于表示 IO 流,通过一些 monad 原语,您可以像这样构造 IO 流:

-- Stream monad will provide some primitives to create monadic value, e.g.
-- yield :: Monad m => a -> Stream (Of a) m ()
-- yield a = Step (a :> Return ())
-- instance (MonadIO m, Functor f) => MonadIO (Stream f m) where
--   liftIO = Effect . fmap Return . liftIO

foo :: Stream (Of a) IO ()
foo = do
    yield 1
    yield 2
    lift readLn >>= yield

StreamMonad 实例中,foo 的值现在变成了一条 Stream ADT 的链:

Step (1 :> Step (2 :>  Effect (\ x -> Step x :> Return ()) <$> readLn))

现在,如果我们提供迭代此ADT的函数,则可以处理该流。这种函数通常称为解释器,是free monad 设计模式 中的术语。例如, streaming 提供了自己的 foldrM 解释器来处理 Stream 结构:

foldrM :: Monad m => (a -> m r -> m r) -> Stream (Of a) m r -> m r
foldrM step = loop where
  loop stream = case stream of
    Return r       -> return r
    Effect m       -> m >>= loop        -- This is where IO effects happened!
    Step (a :> as) -> step a (loop as)

The Magic Pipes

在 hackage 里有一些包将 free monad 技术推向了极限,例如pipes 提供了一种相当难以理解的核心ADT类型:

data Proxy a' a b' b m r
    = Request a' (a  -> Proxy a' a b' b m r )
    | Respond b  (b' -> Proxy a' a b' b m r )
    | M          (m    (Proxy a' a b' b m r))
    | Pure    r

有了这个复杂的类型,管道可以提供更多有趣的原语,例如 await>-> 。 例如 do x <- await; y <- await; return (x, y) 变为:

Request () (\ x -> Request () (\ y -> Pure (x, y)))

一种 pipes 所使用的技术是使用 Void 类型来消除某些类型下的某些构造函数,同时仍保持可组合性:

-- | type with no constructors
type X = Void

-- | 'Effect's neither 'Pipes.await' nor 'Pipes.yield'
type Effect = Proxy X () () X
-- | 'Producer's can only 'Pipes.yield'
type Producer b = Proxy X () () b
-- | 'Pipe's can both 'Pipes.await' and 'Pipes.yield'
type Pipe a b = Proxy () a () b
-- | 'Consumer's can only 'Pipes.await'
type Consumer a = Proxy () a () X

回顾

Free monad is powerful, but hard to use

Free monad 很强大,但是难以使用

Free monad 可以为您提供尽可能多的原语,并且您可以选择不同的解释器运行他,但是因为以下几个层面的原因导致它很难被使用:

  • Free monad 很难被理解,您必须非常仔细地阅读 monad 实例,才能理解这些原语是如何工作的
  • Monad transformers 也有同样的问题,因为现在每个基础的 monad 操作都需要被提升
  • 很难通过编译器进行优化,因为现在每个操作都成为 ADT 的构造函数,并经常导致更多的内存分配

用于流的 free monad 构造器还需要提供一组不同的 combinators ,例如 mapMfoldM ,它们与 Control.Monad 不兼容。

其他语言是如何处理流的

有趣的是,发现大多数面向对象语言都以一种非常简单的方式解决了此问题,例如javascript。

// from node.js example
const fs = require('fs');
const zlib = require('zlib');
const r = fs.createReadStream('file.txt');

const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

// or you can manually connect streams like this:
r.on('data', (chunk) => { z.write(chunk); });
z.on('data', (chunk) => { w.write(chunk); });

在面向对象编程的视角中,流节点是一个对象,具有一种接收块并写入下游内部回调方法,仅此而已。这种模式有一些缺点:

  • Stream 节点失去了控制,例如您没有办法在不接触 source 的情况下停止中间节点中的流处理。这是所有基于回调的API的 控制反转 问题。
  • Stream 节点现在变成了可变的有状态的对象,这在 Haskell 中是不自然的。

BIO 介绍

Z-IO v0.8中,我们为简化流处理引入了一种新的 BIO 类型,其具有三个设计目标:

  • 提供一种简单的可组合的类型
  • 没有 transformer ; 没有提升。
  • 使得应用和 BIO 库的开发都更容易

结果就是我们提供了一种专注于 callback transformation 的类型:

-- A bio node receives a callback, returns a new callback to be called from upstream.
type BIO inp out = (Maybe out -> IO ()) -> Maybe inp -> IO ()

-- A Source doesn't consume any meaningful input
type Source a = BIO Void a
-- A Sink doesn't produce any meaningful output
type Sink a = BIO a Void

-- | A pattern for more meaningful matching.
pattern EOF :: Maybe a
pattern EOF = Nothing

例如,使用 BIO 实现 zlib 节点:

compressBIO :: ZStream -> BIO V.Bytes V.Bytes
compressBIO zs = \ callback mbs ->
    case mbs of
        Just bs -> do
            -- feed input chunk to ZStream
            set_avail_in zs bs (V.length bs)
            let loop = do
                    oavail :: CUInt <- withCPtr zs $ \ ps -> do
                        -- perform deflate and peek output buffer remaining
                        throwZlibIfMinus_ (deflate ps (#const Z_NO_FLUSH))
                        (#peek struct z_stream_s, avail_out) ps
                    when (oavail == 0) $ do
                        -- when output buffer is full,
                        -- freeze chunk and call the callback
                        oarr <- A.unsafeFreezeArr =<< readIORef bufRef
                        callback (Just (V.PrimVector oarr 0 bufSiz))
                        newOutBuffer
                        loop
            loop
        _ -> ... similar to above, with no input chunk and Z_FINISH flag

当实现 Source 时,您只需忽略 EOF 参数,并在准备好新块后调用回调。

-- | Turn a `IO` action into 'Source'
sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a
sourceFromIO io = \ k _ ->
    let loop = io >>= \ x ->
            case x of
                Just _ -> k x >> loop   -- you should loop inside a Source
                _      -> k EOF
    in loop

您应该假设 EOF 参数仅给出一次,因此经常需要循环。 类似于 SourceSink 不需要写任何输出,直到最后的 EOF

sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a
sinkToIO f = \ k ma ->
    case ma of
        Just a -> f a
        _ -> k EOF

组合 BIO 并运行

组合 BIO 很简单:您可以使用函数组合运算符 (.) 来连接 BIO ,因为它只是一个回调的转换:

import Z.Data.CBytes    (CBytes)
import Z.IO
import Z.IO.BIO
import Z.IO.BIO.Zlib

base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO ()
base64AndCompressFile origin target = do
    base64Enc <- newBase64Encoder
    (_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31}

    withResource (initSourceFromFile origin) $ \ src ->
        withResource (initSinkToFile target) $ \ sink ->
            runBIO_ $ src . base64Enc . zlibCompressor . sink

上面的代码类似于命令行 cat origin | base | gzip > target ,且 runBIO_ 的定义很简单:

-- | Discards a value, used as the callback to `Sink`.
discard :: a -> IO ()
discard _ = return ()

runBIO_ :: HasCallStack => BIO inp out -> IO ()
runBIO_ bio = bio discard EOF

结论

Hackage 上的流处理库很多,其中大多数都是围绕 free monad 模式设计的。在 Z-IO 中,我们基于回调转换引入了一种新的更简单的设计,该设计使得编写流处理器和应用程序都变得更简单。当然,没有什么是银弹。 Z-IO 中的 BIO 类型也有局限性,例如,下游节点不使用 IO 状态的时候是没有办法停止 source 唤起整个回调链路的,并且整个状态管理现在会依赖于 IO ,而不是用户提供的 state monad。