Parsing with an MCAP Index
Welcome to the sixth and final part of our series on MCAP parsing. In the first four parts, we did all the work to parse certain topic messages from a bag file, going through the file sequentially. In the fifth part, we set up a lot of infrastructure to allow ourselves to instead parse the file using indexing so we can get our messages faster. In this part, we’ll finish the process and see how much faster our code is!
This series has combined a lot of ideas from several of our online courses. You can learn more about complex monad structures in Making Sense of Monads or Effectful Haskell. You can learn more about parsing in Solve.hs. You can get lifetime access to all our courses buy purchasing MMH Complete!
The Plan
Let’s recap the most important items from the last part and go over our plan again. We defined a series of types to help us represent various record types, and parsers for them:
data FooterRec = ...
parseFooter’ :: parseFooter' :: (MonadParsec Void ByteString m) => m (Word64, FooterRec)
data SummaryOffsetRec = ...
parseSummaryOffset :: (MonadFail m, MonadParsec Void ByteString m) => m (Word64, SummaryOffsetRec)
data ChunkIndexRec = ...
parseChunkIndex :: (MonadFail m, MonadParsec Void ByteString m) => m (Word64, ChunkIndexRec)
data MessageIndexRec = ...
parseMessageIndex' :: (MonadFail m, MonadParsec Void ByteString m) => m (Word64, MessageIndexRec)
We also defined a couple monad structures to capture our parser’s state:
data TopicState = TopicState
{ tsChannels :: HM.HashMap Word16 MsgChannel
, tsSchemas :: HM.HashMap Word16 MsgSchema
, tsMessages :: HM.HashMap (ByteString, ByteString) [MessageData]
, tsDesiredTopics :: HS.HashSet ByteString
, tsDesiredChannels :: HS.HashSet Word16
} deriving (Show)
type MCAPReader a = StateT TopicState (ReaderT Handle (ExceptT ValidationError IO)) a
type MCAPParser a = ParsecT Void ByteString (StateT TopicState (ReaderT Handle (ExceptT ValidationError IO))) a
Finally, we defined a series of utility functions on these monads:
seek :: Integer -> MCAPReader ()
curPos :: MCAPReader Integer
mcapGuard :: String -> Bool -> MCAPReader ()
runMCAPParser :: MCAPParser (Word64, a) -> ByteString -> MCAPReader (Word64, a)
parseNextRecord :: RecordType -> MCAPParser (Word64, a) -> MCAPReader (Word64, a)
parseNextRecord' :: RecordType -> (Word64 -> MCAPParser (Word64, a)) -> MCAPReader (Word64, a)
runMCAPReader :: Handle -> TopicState -> MCAPReader a -> IO (Either ValidationError TopicState)
Our overall plan is:
- Parse the footer and use its data to jump to the start of the summary offset section
- Parse summary offsets for schemas, channels, and chunk indexes
- Parse summary records for schemas and channels to load them into
TopicState - Process chunk indexes, which direct us to message index records for our desired channels
- Process message indexes, which point us to individual messages for our desired topics
- Parse the messages into our Topic State
Let’s get started!
Step 1 - Parsing the Footer
Our entrypoint for the MCAPReader monad will be a function called parseUsingIndex (we’ll see how to enter this function later).
parseUsingIndex :: MCAPReader ()
The first order of business is to seek to the Footer’s location so we can parse the Footer record. The Footer has a fixed size and it is always the last record before the 8 “magic bytes” that conclude the file. This size is 29 bytes: 1 byte for the op code (record type), 8 bytes to define the content size, and 20 bytes for the 3 fields (Word64, Word64, Word32). Combined with the magic bytes, this means we want to find the file size and seek to 37 bytes before it:
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
footerStart <- seekToFooterStart
...
where
seekToFooterStart = do
hndl <- ask
sz <- liftIO $ hFileSize hndl
seek (sz - 37)
curPos
Now we use parseNextRecord and parseFooter’ to get the Footer’s data, which includes the start of the summary offset section. We unpack this value and seek to its location:
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
footerStart <- seekToFooterStart
(_, FooterRec summStart summOffStart _) <- parseNextRecord FileFooter parseFooter'
seek (fromIntegral summOffStart)
...
where
seekToFooterStart = do
hndl <- ask
sz <- liftIO $ hFileSize hndl
seek (sz - 37)
curPos
Now we’re ready for step 2!
Step 2 - Parsing Summary Offsets
Recall that records in the summary section are grouped in blocks, and each summary offset record leads us to the start of the block for a particular op code. We’re interested in the summary records for schemas, channels and chunk indexes. So we want to keep parsing summary offsets until we have all three of these.
So we’ll build a helper function that will construct a map from op codes to summary offset records, but also track booleans for which of our desired types we’ve seen. This function also needs to track the number of bytes remaining in the summary offset section:
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
footerStart <- seekToFooterStart
(_, FooterRec summStart summOffStart _) <- parseNextRecord FileFooter parseFooter'
seek (fromIntegral summOffStart)
...
where
parseSummaryOffsets :: Word64 -> Bool -> Bool -> Bool -> M.Map RecordType SummaryOffsetRec -> MCAPReader (M.Map RecordType SummaryOffsetRec)
...
There are two base cases. If we are out of bytes, we return our accumulated map. If all three boolean flags are true, then we can also return the map, as we have what we need.
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
...
where
parseSummaryOffsets :: Word64 -> Bool -> Bool -> Bool -> M.Map RecordType SummaryOffsetRec -> MCAPReader (M.Map RecordType SummaryOffsetRec)
parseSummaryOffsets 0 _ _ _ acc = return acc
parseSummaryOffsets _ True True True acc = return acc
parseSummaryOffsets remBytes hasSchema hasChannel hasChunkIndex acc =
...
For the general case, we use parseSummaryOffset to parse the next record. We update our boolean flags depending on the op code, and we insert the new record into our map. Then we recurse:
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
...
where
parseSummaryOffsets :: Word64 -> Bool -> Bool -> Bool -> M.Map RecordType SummaryOffsetRec -> MCAPReader (M.Map RecordType SummaryOffsetRec)
parseSummaryOffsets 0 _ _ _ acc = return acc
parseSummaryOffsets _ True True True acc = return acc
parseSummaryOffsets remBytes hasSchema hasChannel hasChunkIndex acc = do
mcapGuard ("Not enough bytes for summary offset: " <> show remBytes) (remBytes >= 26)
(_, summ@(SummaryOffsetRec op _ _)) <- parseNextRecord SummaryOffset parseSummaryOffset
let hasSchema' = hasSchema || op == Schema
let hasChannel' = hasChannel || op == Channel
let hasChunkIndex' = hasChunkIndex || op == ChunkIndex
let mp = M.insert op summ acc
parseSummaryOffsets (remBytes - 26) hasSchema' hasChannel' hasChunkIndex' mp
Now we’ll define a second helper. This will use the Maybe monad to extract the 3 records we care about.
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
...
where
...
find3Offsets :: M.Map RecordType SummaryOffsetRec -> Maybe (SummaryOffsetRec, SummaryOffsetRec, SummaryOffsetRec)
find3Offsets mp = do
s <- M.lookup Schema mp
c <- M.lookup Channel mp
ci <- M.lookup ChunkIndex mp
return (s, c, ci)
Now back in the main line of the function, we’ll get the size of this section (subtract the summary offset start from the footer start position) and call our two helpers. If we don’t have all three maps, we’ll throw a validation error showing what op codes we did find. Otherwise, we have the 3 offsets.
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
footerStart <- seekToFooterStart
(_, FooterRec summStart summOffStart _) <- parseNextRecord FileFooter parseFooter'
seek (fromIntegral summOffStart)
-- Read Summary Offsets until we have Schema, Channel and ChunkIndex
let sz = fromIntegral footerStart - summOffStart
summOffMap <- parseSummaryOffsets sz False False False M.empty
let offsets = find3Offsets summOffMap
case offsets of
Nothing -> throwError (ValidationError $ "Did not find the right summary offsets: " <> show (M.keys summOffMap))
Just (schemaOffset, channelOffset, chunkIndexOffset) -> ...
where
parseSummaryOffsets :: Word64 -> Bool -> Bool -> Bool -> M.Map RecordType SummaryOffsetRec -> MCAPReader (M.Map RecordType SummaryOffsetRec)
find3Offsets :: M.Map RecordType SummaryOffsetRec -> Maybe (SummaryOffsetRec, SummaryOffsetRec, SummaryOffsetRec)
Now we’ve got these summary offset records! It’s time to use them to start populating our topic state.
Step 3 - Summary Schemas and Channels
We’ll begin with the schemas and channels. These are fairly easy, since we can rely on previous code for the most part. Recall that we have the functions parseSchema’ and parseChannel’ for parsing individual schema and channel records. We need to update these type signatures to be more generic like so:
parseSchema' :: (MonadParsec Void ByteString m, MS.MonadState TopicState m) => m (Word64, Record)
parseChannel' :: (MonadFail m, MonadParsec Void ByteString m, MS.MonadState TopicState m) => m (Word64, Record)
These will work with the MCAPParser monad. They require MonadState TopicState because they’ll save the schemas and channels within our TopicState, rather than requiring us to do anything with the return values.
So we just need to write functions that will look through all the records in the summary block and parse them. Since the summary offset contains the start location and the byte length of all the records, we’ll seek to that location, and then use our tried and true method of parsing until we run out of bytes. Here’s a function to load all the schemas in a group:
loadSchemasForIndex :: SummaryOffsetRec -> MCAPReader ()
loadSchemasForIndex (SummaryOffsetRec _ groupStart groupLen) = do
seek (fromIntegral groupStart)
f groupLen
where
f 0 = return ()
f rem = do
(len, _) <- parseNextRecord Schema parseSchema'
mcapGuard ("Summary schema is too long! " <> show len <> " " <> show rem) (len <= rem)
f (rem - len)
And here’s the function for channels:
loadChannelsForIndex :: SummaryOffsetRec -> MCAPReader ()
loadChannelsForIndex (SummaryOffsetRec _ groupStart groupLen) = do
seek (fromIntegral groupStart)
f groupLen
where
f 0 = return ()
f rem = do
(len, _) <- parseNextRecord Channel parseChannel'
mcapGuard ("Summary channel is too long! " <> show len <> " " <> show rem) (len <= rem)
f (rem - len)
We simply call these from within parseUsingIndex like so:
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
footerStart <- seekToFooterStart
(_, FooterRec summStart summOffStart _) <- parseNextRecord FileFooter parseFooter'
seek (fromIntegral summOffStart)
-- Read Summary Offsets until we have Schema, Channel and ChunkIndex
let sz = fromIntegral footerStart - summOffStart
summOffMap <- parseSummaryOffsets sz False False False M.empty
let offsets = find3Offsets summOffMap
case offsets of
Nothing -> throwError (ValidationError $ "Did not find the right summary offsets: " <> show (M.keys summOffMap))
Just (schemaOffset, channelOffset, chunkIndexOffset) -> do
loadSchemasForIndex schemaOffset
loadChannelsForIndex channelOffset
...
Now our internal TopicState is updated with the schemas and channels. This means we’re ready to parse messages, but we need to find where those messages are first!
Step 4 - Chunk Index Processing
Now we need to process each Chunk Index record. We’ll write a function for this, and it will be the last thing we need to call from parseUsingIndex:
processChunkIndexes :: SummaryOffsetRec -> MCAPReader ()
parseUsingIndex :: MCAPReader ()
parseUsingIndex = do
footerStart <- seekToFooterStart
(_, FooterRec summStart summOffStart _) <- parseNextRecord FileFooter parseFooter'
seek (fromIntegral summOffStart)
-- Read Summary Offsets until we have Schema, Channel and ChunkIndex
let sz = fromIntegral footerStart - summOffStart
summOffMap <- parseSummaryOffsets sz False False False M.empty
let offsets = find3Offsets summOffMap
case offsets of
Nothing -> throwError (ValidationError $ "Did not find the right summary offsets: " <> show (M.keys summOffMap))
Just (schemaOffset, channelOffset, chunkIndexOffset) -> do
loadSchemasForIndex schemaOffset
loadChannelsForIndex channelOffset
processChunkIndexes chunkIndexOffset
where
...
To fill out this function, we follow the same general pattern to loop through these records until we’ve exhausted the bytes:
processChunkIndexes :: SummaryOffsetRec -> MCAPReader ()
processChunkIndexes (SummaryOffsetRec _ groupStart groupLen) = do
seek (fromIntegral groupStart)
f groupLen
where
f 0 = return ()
f rem = do
(len, ci) <- parseNextRecord ChunkIndex parseChunkIndex
...
The trick is that processing each individual chunk index is more complicated after we’ve parsed it. Each chunk index refers to a single Chunk record and contains its initial position (we saved this in the cirChunkStartOffset field). However, the message index offsets are not absolute positions. They are relative to the start of the data section of the chunk. So we want to calculate where this location is (and it took me a bit of debugging to get it right!).
Prior to the data section in the chunk is the record header, which is 9 bytes. Then there are 32 fixed bytes, followed by the compression string. Then there are still 8 more bytes after that because the data section in the chunk is prefixed by the byte count. You can consult the Chunk record specification to see where these numbers are coming from. So putting it all together, we take the chunk start location, add 49 bytes, and then add the length of the compression string.
processChunkIndexes :: SummaryOffsetRec -> MCAPReader ()
processChunkIndexes (SummaryOffsetRec _ groupStart groupLen) = do
seek (fromIntegral groupStart)
f groupLen
where
f 0 = return ()
f rem = do
(len, ci) <- parseNextRecord ChunkIndex parseChunkIndex
let chunkDataStart = cirChunkStartOffset ci + 49 + (fromIntegral $ BS.length (cirCompression ci))
savedPosition <- curPos
... -- Process message index records
seek savedPosition
You’ll also see that we save the current handle position. We’ll process the message index records and jump around to the messages. But we want to return to the current location (the location after parsing this chunk index) so that we can process the next chunk index afterward.
Now we have to look through all of our desired channels and see which of them are referred to in the cirMessageIndexOffsets of this chunk index:
processChunkIndexes :: SummaryOffsetRec -> MCAPReader ()
processChunkIndexes (SummaryOffsetRec _ groupStart groupLen) = do
seek (fromIntegral groupStart)
f groupLen
where
f 0 = return ()
f rem = do
(len, ci) <- parseNextRecord ChunkIndex parseChunkIndex
let chunkDataStart = cirChunkStartOffset ci + 49 + (fromIntegral $ BS.length (cirCompression ci))
savedPosition <- curPos
chans <- MS.gets tsDesiredChannels
let allMIO = cirMessageIndexOffsets ci
forM_ chans $ \chanId -> when (HM.member chanId allMIO) $ do
...
seek savedPosition
Now we get the offset for this channel in the map. This will give us the (absolute) location of the Message Index record for this channel. We’ll prepare a new function to process this message index:
processChunkIndexes :: SummaryOffsetRec -> MCAPReader ()
processChunkIndexes (SummaryOffsetRec _ groupStart groupLen) = do
seek (fromIntegral groupStart)
f groupLen
where
f 0 = return ()
f rem = do
(len, ci) <- parseNextRecord ChunkIndex parseChunkIndex
let chunkDataStart = cirChunkStartOffset ci + 49 + (fromIntegral $ BS.length (cirCompression ci))
savedPosition <- curPos
chans <- MS.gets tsDesiredChannels
let allMIO = cirMessageIndexOffsets ci
forM_ chans $ \chanId -> when (HM.member chanId allMIO) $ do
let chanOffset = allMIO HM.! chanId
loadMessagesFromMessageIndex chanId chanOffset chunkDataStart
seek savedPosition
loadMessagesFromMessageIndex :: Word16 -> Word64 -> Word64 -> MCAPReader ()
loadMessagesFromMessageIndex chanId offset chunkDataStart
This is all we need for processChunkIndexes. We’re getting closer to our goal! Now we need to process the message index record.
Steps 5 & 6 - Processing Message Indexes and Messages
You’ll note from above that we passed the channel ID, the offset of the message index, and the corresponding “chunk data start”. We begin this function by seeking the to message index location, parsing the message index, and ensuring that we are getting an index for the correct channel:
loadMessagesFromMessageIndex :: Word16 -> Word64 -> Word64 -> MCAPReader ()
loadMessagesFromMessageIndex chanId offset chunkDataStart = do
seek (fromIntegral offset)
(_, MessageIndexRec mirChanId recs) <- parseNextRecord MessageIndex parseMessageIndex'
mcapGuard ("Mismatched channels for message index record: " <> show chanId <> " " <> show mirChanId) (mirChanId == chanId)
...
Now we loop through the record tuples in the message index record. Each of these contains a relative offset. We just need to seek to that offset, adding in the chunk data start location. Then we can use our message parser function!
loadMessagesFromMessageIndex :: Word16 -> Word64 -> Word64 -> MCAPReader ()
loadMessagesFromMessageIndex chanId offset chunkDataStart = do
seek (fromIntegral offset)
(_, MessageIndexRec mirChanId recs) <- parseNextRecord MessageIndex parseMessageIndex'
mcapGuard ("Mismatched channels for message index record: " <> show chanId <> " " <> show mirChanId) (mirChanId == chanId)
forM_ recs $ \(_, offset) -> do
seek (fromIntegral $ chunkDataStart + offset)
parseNextRecord' Message parseMessage'
parseMessage' :: (MonadParsec Void ByteString m, MS.MonadState TopicState m, MonadFail m) => Word64 -> m (Word64, Record)
That’s all we need! Like our schema and channel parsers, parseMessage’ does the work of saving the messages in our TopicState, so we don’t need to return or process anything. The path we started from parseUsingIndex is now complete!
Compression Disclaimer
As an important note, this code takes a shortcut. The approach given will only work with uncompressed chunks! The relative offset we used for the message index is actually the offset from the start of the uncompressed data. If the chunk’s data is compressed, we would have to grab that whole uncompressed string, decompress as much of it as we need, and then jump to particular locations within that bytestring. The code required to do this efficiently is much more intricate, as it requires us to manage counts on the streams of compressed and uncompressed data.
This highlights an important tradeoff. By leaving the data uncompressed, it takes up more storage space and would take longer to send over a network if you need to download these bag files. But if the data is compressed, we might have to decompress a significant portion of it to find the few messages we care about, meaning we don’t see the same gains in bag read time. The particular needs of your application & systems will determine what choice makes sense.
Pulling it Together
Now we need an entrypoint to call parseUsingIndex. Our function will take the filepath, open a handle to it, and create the initial topic state, looking for the /turtle1/cmd_vel topic. We’ll invoke the parseUsingIndex function with runMCAPParser:
parseRecordsFromFileWithIndex :: FilePath -> IO ()
parseRecordsFromFileWithIndex fp = do
parseBareRecordsFromFile
startTime <- getCurrentTime
handle <- openFile fp ReadMode
let initialTs = TopicState HM.empty HM.empty HM.empty (HS.fromList ["/turtle1/cmd_vel"]) HS.empty
result <- runMCAPReader handle initialTs parseUsingIndex
...
Then, in a reprisal of previous code, we’ll loop through all the messages for our topic (they use the geometry_msgs/msg/Twist type from ROS2), and decode them all. (Recall that in the previous part we wrote parseVelMsg for this type).
parseRecordsFromFileWithIndex :: FilePath -> IO ()
parseRecordsFromFileWithIndex fp = do
startTime <- getCurrentTime
handle <- openFile fp ReadMode
let initialTs = TopicState HM.empty HM.empty HM.empty (HS.fromList ["/turtle1/cmd_vel"]) HS.empty
result <- runMCAPReader handle initialTs parseUsingIndex
case result of
Left e -> print e
Right finalTs -> do
let velMessages = fromMaybe [] $ HM.lookup ("/turtle1/cmd_vel", "geometry_msgs/msg/Twist") (tsMessages finalTs)
forM_ velMessages $ \(MessageData t1 t2 msg) -> do
parseVelResult <- evalStateT (runParserT parseVelMsg "Velocity" msg) 0
case parseVelResult of
Left e -> print e
Right s -> putStrLn $ "Parsed Velocity Message: " <> show t1 <> " " <> show t2 <> " " <> show s
endTime <- getCurrentTime
print (diffUTCTime endTime startTime)
And now we’re done! We added some timing features, so we can compare this to our previous run. It is much faster! The original takes around 0.075s. The new version takes about 0.001s. So it’s about 75x faster, a great speedup!
Conclusion
This concludes our series on ROS2 bag parsing! Over the course of this long series, we were able to take an MCAP bag file, parse topic messages out of it, and then use the indexing features to do this even faster! We learn a ton about the MCAP format as well as CDR encoding. We saw how to compose parsers, even when they had stateful side effects like file seeking or saving message data.
With so much ground to cover, there wasn’t a lot of time in this series to go over the basics. If you want to solidify your Haskell skills so that you can write advanced code like this, the best way to do that is to take a look at some of our courses.
The most immediately useful for this series would be Solve.hs. Module 1 teaches you valuable loop structures that form the building block for all of our complicated functions in Haskell. Then Module 4 will teach you about parsing and Megaparsec! In between, you’ll also learn about Data Structures and Algorithms in Haskell!
Making Sense of Monads and Effectful Haskell are also great choices if the monad structures in the second half of the series are a bit confusing to you. You can grab both of these courses for a discount by purchasing the Effects Bundle!
Finally, if you want access to all our course content, past, present and future, you can get the MMH Complete bundle! This will include all the previous courses, as well as our other full length courses Haskell From Scratch, Practical Haskell, plus our mini-course on machine learning, Haskell Brain.