fix correct finalizing of last block
This commit is contained in:
parent
6431062ea7
commit
bb3e66073f
1 changed files with 42 additions and 26 deletions
|
@ -3,7 +3,7 @@
|
|||
-- | MongoDB GridFS implementation
|
||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, RankNTypes #-}
|
||||
|
||||
module Database.MongoDB.GridFS
|
||||
module Database.MongoDB.GridFS
|
||||
( Bucket
|
||||
, files, chunks
|
||||
, File
|
||||
|
@ -24,11 +24,11 @@ module Database.MongoDB.GridFS
|
|||
where
|
||||
|
||||
import Control.Applicative((<$>))
|
||||
import Control.Concurrent(forkIO)
|
||||
|
||||
import Control.Monad(when)
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Monad.Trans(MonadTrans, lift)
|
||||
import Control.Monad.Trans.Resource(MonadResource(..))
|
||||
|
||||
import Data.Conduit
|
||||
import Data.Digest.Pure.MD5
|
||||
import Data.Int
|
||||
|
@ -40,13 +40,17 @@ import Prelude
|
|||
import qualified Data.Bson as B
|
||||
import qualified Data.ByteString as S
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified Data.Conduit.Binary as BI
|
||||
import qualified Data.Conduit.List as CL
|
||||
|
||||
|
||||
defaultChunkSize :: Int64
|
||||
-- ^ The default chunk size is 256 kB
|
||||
defaultChunkSize = 256 * 1024
|
||||
|
||||
-- magic constant for the
|
||||
md5BlockSizeInBytes :: Int
|
||||
md5BlockSizeInBytes = 64
|
||||
|
||||
|
||||
data Bucket = Bucket {files :: Text, chunks :: Text}
|
||||
-- ^ Files are stored in "buckets". You open a bucket with openDefaultBucket or openBucket
|
||||
|
||||
|
@ -125,12 +129,12 @@ data FileWriter = FileWriter
|
|||
, fwAcc :: L.ByteString
|
||||
, fwMd5Context :: MD5Context
|
||||
, fwMd5acc :: L.ByteString
|
||||
}
|
||||
}
|
||||
|
||||
-- Finalize file, calculating md5 digest, saving the last chunk, and creating the file in the bucket
|
||||
finalizeFile :: (Monad m, MonadIO m) => Text -> FileWriter -> Action m File
|
||||
finalizeFile filename (FileWriter chunkSize bucket files_id i size acc md5context md5acc) = do
|
||||
let md5digest = md5Finalize md5context (L.toStrict md5acc)
|
||||
let md5digest = finalizeMD5 md5context (L.toStrict md5acc)
|
||||
when (L.length acc > 0) $ putChunk bucket files_id i acc
|
||||
timestamp <- liftIO $ getCurrentTime
|
||||
let doc = [ "_id" =: files_id
|
||||
|
@ -140,30 +144,42 @@ finalizeFile filename (FileWriter chunkSize bucket files_id i size acc md5contex
|
|||
, "chunkSize" =: chunkSize
|
||||
, "filename" =: filename
|
||||
]
|
||||
_ <- liftIO $ putStrLn $ "md5 : " ++ (show md5digest)
|
||||
insert_ (files bucket) doc
|
||||
return $ File bucket doc
|
||||
|
||||
-- finalize the remainder and return the MD5Digest.
|
||||
finalizeMD5 :: MD5Context -> S.ByteString -> MD5Digest
|
||||
finalizeMD5 ctx rest =
|
||||
md5Finalize ctx2 (S.drop lu rest) -- can only handle max md5BlockSizeInBytes length
|
||||
where
|
||||
l = S.length rest
|
||||
r = l `mod` md5BlockSizeInBytes
|
||||
lu = l - r
|
||||
ctx2 = md5Update ctx (S.take lu rest)
|
||||
|
||||
-- Write as many chunks as can be written from the file writer
|
||||
writeChunks :: (Monad m, MonadIO m) => FileWriter -> L.ByteString -> Action m FileWriter
|
||||
writeChunks (FileWriter chunkSize bucket files_id i size acc md5context md5acc) chunk = do
|
||||
-- Update md5 context
|
||||
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
|
||||
let md5acc_temp = (md5acc `L.append` chunk)
|
||||
let (md5context', md5acc') =
|
||||
if (L.length md5acc_temp < md5BlockLength)
|
||||
then (md5context, md5acc_temp)
|
||||
else let numBlocks = L.length md5acc_temp `div` md5BlockLength
|
||||
(current, rest) = L.splitAt (md5BlockLength * numBlocks) md5acc_temp
|
||||
in (md5Update md5context (L.toStrict current), rest)
|
||||
-- Update chunks
|
||||
let size' = (size + L.length chunk)
|
||||
let acc_temp = (acc `L.append` chunk)
|
||||
if (L.length acc_temp < chunkSize)
|
||||
then return (FileWriter chunkSize bucket files_id i size' acc_temp md5context' md5acc')
|
||||
else do
|
||||
let (chunk, acc') = L.splitAt chunkSize acc_temp
|
||||
putChunk bucket files_id i chunk
|
||||
writeChunks (FileWriter chunkSize bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
|
||||
writeChunks (FileWriter chunkSize bucket files_id i size acc md5context md5acc) chunk =
|
||||
do
|
||||
-- Update md5 context
|
||||
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
|
||||
let md5acc_temp = (md5acc `L.append` chunk)
|
||||
let (md5context', md5acc') =
|
||||
if (L.length md5acc_temp < md5BlockLength)
|
||||
then (md5context, md5acc_temp)
|
||||
else let numBlocks = L.length md5acc_temp `div` md5BlockLength
|
||||
(current, rest) = L.splitAt (md5BlockLength * numBlocks) md5acc_temp
|
||||
in (md5Update md5context (L.toStrict current), rest)
|
||||
-- Update chunks
|
||||
let size' = (size + L.length chunk)
|
||||
let acc_temp = (acc `L.append` chunk)
|
||||
if (L.length acc_temp < chunkSize)
|
||||
then return (FileWriter chunkSize bucket files_id i size' acc_temp md5context' md5acc')
|
||||
else do
|
||||
let (chunk, acc') = L.splitAt chunkSize acc_temp
|
||||
putChunk bucket files_id i chunk
|
||||
writeChunks (FileWriter chunkSize bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
|
||||
|
||||
sinkFile :: (Monad m, MonadIO m) => Bucket -> Text -> Consumer S.ByteString (Action m) File
|
||||
-- ^ A consumer that creates a file in the bucket and puts all consumed data in it
|
||||
|
|
Loading…
Reference in a new issue