Merge pull request #73 from VictorDenisov/merge_gridfs

Merge gridfs
This commit is contained in:
Victor Denisov 2016-11-25 20:20:02 -08:00 committed by GitHub
commit d43b63d305
3 changed files with 190 additions and 0 deletions

View file

@ -4,6 +4,9 @@ This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Pac
## [Unreleased] - unreleased ## [Unreleased] - unreleased
### Added
- GridFS implementation
### Fixed ### Fixed
- Write functions hang when the connection is lost. - Write functions hang when the connection is lost.

179
Database/MongoDB/GridFS.hs Normal file
View file

@ -0,0 +1,179 @@
-- Author:
-- Brent Tubbs <brent.tubbs@gmail.com>
-- | MongoDB GridFS implementation
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, RankNTypes #-}
module Database.MongoDB.GridFS
( Bucket
, files, chunks
, File
, document, bucket
-- ** Setup
, openDefaultBucket
, openBucket
-- ** Query
, findFile
, findOneFile
, fetchFile
-- ** Delete
, deleteFile
-- ** Conduits
, sourceFile
, sinkFile
)
where
import Control.Applicative((<$>))
import Control.Concurrent(forkIO)
import Control.Monad(when)
import Control.Monad.IO.Class
import Control.Monad.Trans(MonadTrans, lift)
import Control.Monad.Trans.Control(MonadBaseControl)
import Control.Monad.Trans.Resource(MonadResource(..))
import Data.Conduit
import Data.Digest.Pure.MD5
import Data.Int
import Data.Tagged(Tagged, untag)
import Data.Text(Text, append)
import Data.Time.Clock(getCurrentTime)
import Database.MongoDB
import Prelude
import qualified Data.Bson as B
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import qualified Data.Conduit.Binary as BI
import qualified Data.Conduit.List as CL
defaultChunkSize :: Int64
-- ^ The default chunk size is 256 kB
defaultChunkSize = 256 * 1024
data Bucket = Bucket {files :: Text, chunks :: Text}
-- ^ Files are stored in "buckets". You open a bucket with openDefaultBucket or openBucket
openDefaultBucket :: (Monad m, MonadIO m) => Action m Bucket
-- ^ Open the default 'Bucket' (named "fs")
openDefaultBucket = openBucket "fs"
openBucket :: (Monad m, MonadIO m) => Text -> Action m Bucket
-- ^ Open a 'Bucket'
openBucket name = do
let filesCollection = name `append` ".files"
let chunksCollection = name `append` ".chunks"
ensureIndex $ (index filesCollection ["filename" =: (1::Int), "uploadDate" =: (1::Int)])
ensureIndex $ (index chunksCollection ["files_id" =: (1::Int), "n" =: (1::Int)]) { iUnique = True, iDropDups = True }
return $ Bucket filesCollection chunksCollection
data File = File {bucket :: Bucket, document :: Document}
getChunk :: (Monad m, MonadIO m) => File -> Int -> Action m (Maybe S.ByteString)
-- ^ Get a chunk of a file
getChunk (File bucket doc) i = do
files_id <- B.look "_id" doc
result <- findOne $ select ["files_id" := files_id, "n" =: i] $ chunks bucket
let content = at "data" <$> result
case content of
Just (Binary b) -> return (Just b)
_ -> return Nothing
findFile :: (MonadIO m, MonadBaseControl IO m) => Bucket -> Selector -> Action m [File]
-- ^ Find files in the bucket
findFile bucket sel = do
cursor <- find $ select sel $ files bucket
results <- rest cursor
return $ File bucket <$> results
findOneFile :: MonadIO m => Bucket -> Selector -> Action m (Maybe File)
-- ^ Find one file in the bucket
findOneFile bucket sel = do
mdoc <- findOne $ select sel $ files bucket
return $ File bucket <$> mdoc
fetchFile :: MonadIO m => Bucket -> Selector -> Action m File
-- ^ Fetch one file in the bucket
fetchFile bucket sel = do
doc <- fetch $ select sel $ files bucket
return $ File bucket doc
deleteFile :: (MonadIO m) => File -> Action m ()
-- ^ Delete files in the bucket
deleteFile (File bucket doc) = do
files_id <- B.look "_id" doc
delete $ select ["_id" := files_id] $ files bucket
delete $ select ["files_id" := files_id] $ chunks bucket
putChunk :: (Monad m, MonadIO m) => Bucket -> ObjectId -> Int -> L.ByteString -> Action m ()
-- ^ Put a chunk in the bucket
putChunk bucket files_id i chunk = do
insert_ (chunks bucket) ["files_id" =: files_id, "n" =: i, "data" =: Binary (L.toStrict chunk)]
sourceFile :: (Monad m, MonadIO m) => File -> Producer (Action m) S.ByteString
-- ^ A producer for the contents of a file
sourceFile file = yieldChunk 0 where
yieldChunk i = do
mbytes <- lift $ getChunk file i
case mbytes of
Just bytes -> yield bytes >> yieldChunk (i+1)
Nothing -> return ()
-- Used to keep data during writing
data FileWriter = FileWriter
{ fwChunkSize :: Int64
, fwBucket :: Bucket
, fwFilesId :: ObjectId
, fwChunkIndex :: Int
, fwSize :: Int64
, fwAcc :: L.ByteString
, fwMd5Context :: MD5Context
, fwMd5acc :: L.ByteString
}
-- Finalize file, calculating md5 digest, saving the last chunk, and creating the file in the bucket
finalizeFile :: (Monad m, MonadIO m) => Text -> FileWriter -> Action m File
finalizeFile filename (FileWriter chunkSize bucket files_id i size acc md5context md5acc) = do
let md5digest = md5Finalize md5context (L.toStrict md5acc)
when (L.length acc > 0) $ putChunk bucket files_id i acc
timestamp <- liftIO $ getCurrentTime
let doc = [ "_id" =: files_id
, "length" =: size
, "uploadDate" =: timestamp
, "md5" =: show (md5digest)
, "chunkSize" =: chunkSize
, "filename" =: filename
]
insert_ (files bucket) doc
return $ File bucket doc
-- Write as many chunks as can be written from the file writer
writeChunks :: (Monad m, MonadIO m) => FileWriter -> L.ByteString -> Action m FileWriter
writeChunks (FileWriter chunkSize bucket files_id i size acc md5context md5acc) chunk = do
-- Update md5 context
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
let md5acc_temp = (md5acc `L.append` chunk)
let (md5context', md5acc') =
if (L.length md5acc_temp < md5BlockLength)
then (md5context, md5acc_temp)
else let numBlocks = L.length md5acc_temp `div` md5BlockLength
(current, rest) = L.splitAt (md5BlockLength * numBlocks) md5acc_temp
in (md5Update md5context (L.toStrict current), rest)
-- Update chunks
let size' = (size + L.length chunk)
let acc_temp = (acc `L.append` chunk)
if (L.length acc_temp < chunkSize)
then return (FileWriter chunkSize bucket files_id i size' acc_temp md5context' md5acc')
else do
let (chunk, acc') = L.splitAt chunkSize acc_temp
putChunk bucket files_id i chunk
writeChunks (FileWriter chunkSize bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
sinkFile :: (Monad m, MonadIO m) => Bucket -> Text -> Consumer S.ByteString (Action m) File
-- ^ A consumer that creates a file in the bucket and puts all consumed data in it
sinkFile bucket filename = do
files_id <- liftIO $ genObjectId
awaitChunk $ FileWriter defaultChunkSize bucket files_id 0 0 L.empty md5InitialContext L.empty
where
awaitChunk fw = do
mchunk <- await
case mchunk of
Nothing -> lift (finalizeFile filename fw)
Just chunk -> lift (writeChunks fw (L.fromStrict chunk)) >>= awaitChunk

View file

@ -30,16 +30,23 @@ Library
, text , text
, bytestring -any , bytestring -any
, containers -any , containers -any
, conduit
, conduit-extra
, mtl >= 2 , mtl >= 2
, cryptohash -any , cryptohash -any
, network -any , network -any
, parsec -any , parsec -any
, random -any , random -any
, random-shuffle -any , random-shuffle -any
, resourcet
, monad-control >= 0.3.1 , monad-control >= 0.3.1
, lifted-base >= 0.1.0.3 , lifted-base >= 0.1.0.3
, pureMD5
, tagged
, tls >= 1.2.0 , tls >= 1.2.0
, time
, data-default-class -any , data-default-class -any
, transformers
, transformers-base >= 0.4.1 , transformers-base >= 0.4.1
, hashtables >= 1.1.2.0 , hashtables >= 1.1.2.0
, base16-bytestring >= 0.1.1.6 , base16-bytestring >= 0.1.1.6
@ -49,6 +56,7 @@ Library
Exposed-modules: Database.MongoDB Exposed-modules: Database.MongoDB
Database.MongoDB.Admin Database.MongoDB.Admin
Database.MongoDB.Connection Database.MongoDB.Connection
Database.MongoDB.GridFS
Database.MongoDB.Query Database.MongoDB.Query
Database.MongoDB.Transport Database.MongoDB.Transport
Database.MongoDB.Transport.Tls Database.MongoDB.Transport.Tls