MCAP Indexing
In the first 4 parts of this series (Part 1, Part 2, Part 3, Part 4), we’ve written a program that can parse specific topic information out of MCAP-based ROS2 bags. The Megaparsec library helped us a lot, but our current approach has a weakness; our program scans through the entire file sequentially. Bag files can be very large! So if we’re only interested in a few topics, it can be wasteful to parse through everything.
This is where indexes come in. In the final two parts of this series, we’ll see how we can use MCAP’s indexing features to quickly locate the messages we’re interested in. In this part, we’ll focus on the monadic structure of our code, because this will need to change quite a bit. We’ll also talk about the high-level details of the solution approach. In the next part, we’ll write the code to reach our goal!
In this article, we’ll employ some advanced monad techniques. If you still struggle with understanding monads, you should take our course, Making Sense of Monads! It will help you understand these concepts from the ground up. If you’re more advanced and looking for a challenge, Effectful Haskell is also a great course to try!
What is Indexing?
Indexing is a technique in many programming applications that allows us to query for certain data more quickly. The term originally comes from books, where any research or historical book has an “Index” section at the back where you can look through a sorted list of people, places and concepts and it will tell you the page(s) where they are mentioned.
In the programming context, you’ll hear about them most often with databases. If you have a “Users” table in your database, it will, by default, be very quick to look up a user with their “primary key”. But you’ll probably also add an “index” on their username or email address field, so that when they login, you can quickly find their information without searching the entire users table.
With MCAP bags, we can use indexing to quickly locate data based on the topic or the timestamp, as these are the most common filtering mechanisms for parsing robotics data. Indexing makes use of a number of the record types we skipped over in previous parts. In these final articles, we’ll use the following record types
The Problem Context
To measure our success with indexing, I created a new bag to parse using the Turtlesim node that comes with ROS2 by default. I used this to create a bag file with two topics: /turtle1/pose and /turtle1/cmd_vel.
You don’t need to know much about what these mean, except that our bag has around 12000 messages on the /turtle1/pose topic and only 22 messages on the /turtle1/cmd_vel topic. So if we want to search for the latter topic, we don’t want to process all the pose data.
The velocity topic message type is easy to parse, consisting only of 6 floating point values:
data VelocityMsg = VelocityMsg
{ linearX :: Double
, linearY :: Double
, linearZ :: Double
, angularX :: Double
, angularY :: Double
, angularZ :: Double
} deriving (Show, Eq)
parseVelMsg :: CDRParser VelocityMsg
parseVelMsg = do
parseCdrHeader
d1 <- parseCdrDoubleLE
d2 <- parseCdrDoubleLE
d3 <- parseCdrDoubleLE
d4 <- parseCdrDoubleLE
d5 <- parseCdrDoubleLE
d6 <- parseCdrDoubleLE
return $ VelocityMsg d1 d2 d3 d4 d5 d6
We can substitute this type into our code from previous parts, and use a timer to see how long it takes:
parseBareRecordsFromFile :: FilePath -> IO ()
parseBareRecordsFromFile fp = do
t1 <- getCurrentTime
input <- BS.readFile fp
(result, st) <- runStateT (runParserT parseMcapFile' fp input) (TopicState HM.empty HM.empty HM.empty (HS.fromList ["/turtle1/cmd_vel"]) HS.empty)
case result of
Left e -> print e
Right recs -> do
let velMessages = fromMaybe [] $ HM.lookup ("/turtle1/cmd_vel", "geometry_msgs/msg/Twist") (tsMessages st)
forM_ velMessages $ \(MessageData t1 t2 msg) -> do
parseVelResult <- evalStateT (runParserT parseVelMsg "Velocity" msg) 0
case parseVelResult of
Left e -> print e
Right s -> putStrLn $ "Parsed Velocity Message: " <> show t1 <> " " <> show t2 <> " " <> show s
-- forM_ recs $ \(_, rec) -> printRec rec
-- print st
t2 <- getCurrentTime
print (diffUTCTime t2 t1)
This will print out all 22 messages, and it takes around 0.08 seconds. That’s not a long time, but it’s also not trivial for a bag spanning a few minutes with only 2 topics. Real ROS bags can contain a LOT more data, and this would slow down our current approach! Our goal in these next two articles is to find these velocity messages much, much faster, in a way that isn’t so dependent on the total bag size.
New Record Types
In the prior part, the only record types we used were Schema, Channel, Chunk and Message. We could basically ignore everything else, simply consuming it based on the content length counts.
To use indexing, we’ll have to acquaint ourselves with 4 new index types:
- Footer
- Summary Offset
- Chunk Index
- Message Index
These records form a kind of data chain that will lead us to the messages we want. From the MCAP specification we can build types to represent the data in each of these records:
data FooterRec = FooterRec
{ summaryStart :: Word64
, summaryOffsetStart :: Word64
, summaryCrc :: Word32
}
deriving (Show, Eq)
data SummaryOffsetRec = SummaryOffsetRec
{ groupOpCode :: RecordType
, groupStart :: Word64
, groupLength :: Word64
} deriving (Show, Eq)
data ChunkIndexRec = ChunkIndexRec
{ cirStartTime :: Word64
, cirEndTime :: Word64
, cirChunkStartOffset :: Word64
, cirChunkLength :: Word64
, cirMessageIndexOffsets :: HM.HashMap Word16 Word64
, cirMessageIndexLength :: Word64
, cirCompression :: ByteString
, cirCompressedSize :: Word64
, cirUncompressedSize :: Word64
} deriving (Show, Eq)
data MessageIndexRec = MessageIndexRec
{ mirChannelId :: Word16
, mirRecords :: [(Word64, Word64)]
} deriving (Show, Eq)
Of course, we need code to parse these records, similar to what we wrote for previous record types. These parsers simply make use of primitives we’ve already written, so we won’t dwell on the implementation details. The parsing functions are included at the end of this article in the appendix for completeness.
What we want to understand next is where these record types fit in the context of the whole file.
File Structure Review
We introduced the MCAP file structure in the first part of this series, but it’s worth revisiting now that we’re looking at indexing. Here’s the overall structure:
<Magic>
<Header>
<Data section>
[<Summary section>]
[<Summary Offset section>]
<Footer>
<Magic>
The magic bytes, header and footer are mandatory. Per the specification, it is possible for an “empty” file to omit all the other sections, but having a single message will require having the “Data Section”. But the summary sections are optional, only included to facilitate indexing. The code we’ll write here will assume they exist.
The “Summary section” can contain schema and channel records. These are identical copies of the schemas and channels we could find in the data section. But the summary section also contains chunk index records, as well as other index record types we won’t discuss. The chunk index records then point to particular chunks in the data section. What is distinct and important about the summary section is that all records are grouped by type. All schema records will be in a single consecutive block, all channel records will be in a single consecutive block, and so on. This sets us up to understand summary offsets.
Each summary offset record gives the location of one of these record type blocks. So we should expect one summary offset for schema records in the summary section, one for channels, and one for chunk indexes.
Finally, we should consider where “Message Index” records live. These are in the data section, and they follow after “Chunk” records. Each message index points us to records for a particular channel in the proceeding chunk.
I mentioned the “data chain” earlier, and now we can start to see how it works looking at the file structure and the fields of these particular records.
- The Footer points us to the summary offset section
- The summary offsets point us to the summary section
- We can read schemas and channels in the summary section
- Then chunk index records will point to message index records
- Message index records point us to messages for our channel
That’s the high-level idea, but let’s go into a little more detail, highlighting the exact fields we’ll use to accomplish this.
A Detailed Strategy
Before we go any further, it’s important to understand file seeking. Given a numeric offset within a file and a file handle, it is very quick and efficient for us to tell the handle to “seek”, effectively moving it so that it will then parse the next location when we use an operation like getChar. Lots of the fields within our index records are file locations that we’ll seek to.
Knowing this, let’s lay out the details of exactly which fields we’ll use.
First, we use the Footer’s summaryOffsetStart to jump to the beginning of the summary offset section.
Second, we parse records in the summary offset section until we have a SummaryOffsetRec corresponding to Schemas, Channels, and Chunk Indexes.
Third, we use the groupStart field in each SummaryOffsetRec to jump to each of these summary sections in turn. We’ll parse the schemas, and then parse the channels, loading them into our TopicState like we’ve already been doing.
Step 4: We process each Chunk Index record. The cirMessageIndexOffsets field then provides us with a map from channel IDs to the locations of message index records. So we’ll loop through our desired channel IDs (populated in the third step) and, if this chunk has an index for them, we’ll jump to that message index.
Step 5: Process the message index record for our desired channel. The mirRecords field contains an array with one tuple per message in the corresponding chunk. The second value of this tuple is an offset, relative to where the data starts in the chunk. This is the only relative offset in our program…we need to bring forward the cirChunkStartOffset to make sense of this value.
Step 6: Using the chunk start offset and the offsets from the message index, jump to each Message record and parse it, storing its data like we have previously.
These steps will make more sense when we start writing code. We’ll write most of that code in the next part. First though, we have to come to grips with the fact that this whole jumping around business with “seek” is not consistent with how we’ve been parsing with Megaparsec so far!
Megaparsec vs. Seeking
We’ve written a lot of parsing functions so far with Megaparsec. However, this library assumes we are just walking straight through all the data. It does not supporting seeking at all, especially in the backwards direction.
Our indexing approach requires a lot of seeking, which makes sense. We hop around the file so that we don’t have to parse every individual byte.
The normal way to seek around a file in Haskell is to use a Handle object. Then we can use the hSeek function. It will occasionally also be important to use hTell to get our current position in the file. Then we can use hGet to read as many bytes as we specify into a ByteString.
import System.IO (hSeek, hTell)
import Data.ByteString.Lazy (hGet)
hSeek :: Handle -> Integer -> IO ()
hTell :: Handle -> IO Integer
hGet :: Handle -> Int -> IO ByteString
So instead of reading the file as a lazy bytestring upfront and passing that to Megaparsec, we want to open up a file handle and use that to move around the file and read bytes on demand. As long as IO is on our monad stack, we can do this.
So can we still use the Parsec functions we’ve written so far in this series? Yes, we can! The solution to this lies in the parameterization we employed last time to use the same parsers with CDR. We just want to replace most of the Parser type signatures with something that captures the requirements of the function, such as MonadParsec and MonadFail. For example, here’s how we re-write the signature for parseArray:
parseArray :: (MonadFail m, MonadParsec Void ByteString m) => m (Word64, a) -> m (Word64, [a])
A special case occurs with our parsers for Schemas, Channels and Messages. These impact the TopicState value we have wrapped in a StateT layer of our Parser monad. We can also capture this with a monad class! However, we do need a different import for this:
import qualified Control.Monad.State as MS
parseSchema' :: (MonadParsec Void ByteString m, MS.MonadState TopicState m) => m (Word64, Record)
Now we can access the state using MS.get and MS.put here.
So our general approach will be to use a different IO-based monad to parse out bytestrings containing record data. We can get all the data up front as a lazy bytestring since record headers contain the length of data we need. Then we pass that bytestring into a new invocation of a parsec-based monad like we did with CDR parsing.
Let’s see how we construct the new monad.
Defining a New Monad
Our previous monad looked like this:
type Parser a = ParsecT Void ByteString (StateT TopicState IO) a
This parser has 3 features: parsing a bytestring, making stateful updates to the TopicState, and allowing IO for debugging.
Our desired monad doesn’t have parsec as a feature, but it still requires using TopicState and IO. However we have two new features.
First, we want to track a Handle. We will use this to seek around the file and read from it. This sounds stateful, but because IO handles the “state” part of the handle, we actually only need the handle wrapped in a Reader layer. This could give us a monad like this:
type MCAPReader a = StateT TopicState (ReaderT Handle IO) a
We’ll add one more layer to this though. The ParsecT gave us a graceful layer for MonadFail, causing it to return a Left value instead of Right at the end. The IO monad also has this, but it’s not as graceful, causing a program crash. So we’ll add an ExceptT layer. This will allow us to use throwError on a custom error type so that we can have more graceful error handling.
data ValidationError = ValidationError String
deriving (Show, Eq, Exception)
type MCAPReader a = StateT TopicState (ReaderT Handle (ExceptT ValidationError IO)) a
We can further build another monad MCAPParser, which will put a ParsecT layer on top of MCAPReader. This will serve as the bridge between our new code and the old code.
type MCAPParser a = ParsecT Void ByteString (StateT TopicState (ReaderT Handle (ExceptT ValidationError IO))) a
Since it has ParsecT on top, we can call our existing functions with any function that has an MCAPParser signature! To see this in action, let’s start writing some utility functions for this monad.
Monad Utilities
So let’s wrap up this article by writing some functions for our new monad that we can use across the board. To start, we’ll write wrappers for hSeek and hTell, so that we don’t have to constantly use ask and liftIO in our code:
seek :: Integer -> MCAPReader ()
seek pos = do
hndl <- ask
liftIO (hSeek hndl AbsoluteSeek pos)
curPos :: MCAPReader Integer
curPos = do
hndl <- ask
liftIO (hTell hndl)
Next, let’s add some functions to handle failure cases. These resemble what we already have for guard’. We’ll use throwError on a ValidationError and include the current handle location as part of the error.
failParse :: String -> MCAPReader ()
failParse str = throwError (ValidationError str)
mcapGuard :: String -> Bool -> MCAPReader ()
mcapGuard str cond = do
p <- curPos
unless cond $ failParse (str <> " " <> show p)
Now let’s write a function to run a “Parser” from our “Reader” monad, given a bytestring. We use runParserT, and if the result is Left, we throw a ValidationError. Otherwise we return the result. We’ll be able to pass any of our existing parsers to this function!
runMCAPParser :: MCAPParser (Word64, a) -> ByteString -> MCAPReader (Word64, a)
runMCAPParser parser dataBytes = do
parseResult <- runParserT parser "MCAP" dataBytes
case parseResult of
Left e -> throwError (ValidationError (show e))
Right result -> return result
Now let’s get into the meat of how we use this. We want to generalize the process of parsing a record in our reader monad. The issue with runMCAPParser is that it requires the Bytestring for the body for the record. So we need to actually load those characters. So we’ll wrap it with another function parseNextRecord. This function will take a general parser function as (Word64 -> MCAPParser (Word64, a)).
parseNextRecord' :: RecordType -> (Word64 -> MCAPParser (Word64, a)) -> MCAPReader (Word64, a)
parseNextRecord’ = undefined
We need this extra Word64 parameter strictly for parseMessage, which needs the content length to parse the body. However, our other record parsers can rely on this version, which ignores the parameter:
parseNextRecord :: RecordType -> MCAPParser (Word64, a) -> MCAPReader (Word64, a)
parseNextRecord typ parser = parseNextRecord' typ (const parser)
So how do we actually parse the record? Remember we’ll use hGet to actually parse a certain number of characters. We start with 1 character, for the record’s type. We’ll compare this to the input record type to make sure we’re parsing the expected record:
parseNextRecord' :: RecordType -> (Word64 -> MCAPParser (Word64, a)) -> MCAPReader (Word64, a)
parseNextRecord' typ parser = do
hndl <- ask
recordByte <- liftIO $ BS.hGet hndl 1
(n1, recordType) <- runMCAPParser parseRecordType' recordByte
mcapGuard ("Unexpeted record type: " <> show recordType <> " " <> show typ) (recordType == typ)
...
Now we’ll do the same thing for the 8-byte record content length.
parseNextRecord' :: RecordType -> (Word64 -> MCAPParser (Word64, a)) -> MCAPReader (Word64, a)
parseNextRecord' typ parser = do
hndl <- ask
recordByte <- liftIO $ BS.hGet hndl 1
(n1, recordType) <- runMCAPParser parseRecordType' recordByte
mcapGuard ("Unexpeted record type: " <> show recordType <> " " <> show typ) (recordType == typ)
lenBytes <- liftIO $ BS.hGet hndl 8
(n2, contentLength) <- runMCAPParser parseUint64LE lenBytes
...
Finally, we load the next bytes based on contentLength and pass them to our parser using runMCAPParser above!
parseNextRecord' :: RecordType -> (Word64 -> MCAPParser (Word64, a)) -> MCAPReader (Word64, a)
parseNextRecord' typ parser = do
hndl <- ask
recordByte <- liftIO $ BS.hGet hndl 1
(n1, recordType) <- runMCAPParser parseRecordType' recordByte
mcapGuard ("Unexpeted record type: " <> show recordType <> " " <> show typ) (recordType == typ)
lenBytes <- liftIO $ BS.hGet hndl 8
(n2, contentLength) <- runMCAPParser parseUint64LE lenBytes
contentBytes <- liftIO $ BS.hGet hndl (fromIntegral contentLength)
(n3, record) <- runMCAPParser (parser contentLength) contentBytes
return (n1 + n2 + n3, record)
An important note on parseNextRecord is that it depends on us already being in the right file location. So we’ll have to seek before calling this. But we’ll handle those details next time.
For now, let’s finish this article by writing a run function for our monad. This will dispatch to the run functions for our sub-monads like execStateT and runExceptT.
runMCAPReader :: Handle -> TopicState -> MCAPReader a -> IO (Either ValidationError TopicState)
runMCAPReader handle ts parser = runExceptT (runReaderT (execStateT parser ts) handle)
Now we have all the tools we’ll need for next time!
Conclusion
In this article, we learned more about the structure of an MCAP bag, particularly how various record types facilitate indexing so we can access desired topic data more quickly. We also built a new monad stack to use to help us jump around the file more quickly. In the next and final part of this ROS2 series, we’ll implement the remaining details of using these index records to find our topic data!
Up til now, this series has focused on parsing mechanics. But this article centered more on monad composition, which can be a tricky topic. To learn more about monads, including how to build useful monad stacks, you can take a look at 2 of our courses. Making Sense of Monads is geared more towards beginners, and Effectful Haskell is a more advanced course for those trying to take their understanding of monads to the next level. You can get both of these by purchasing our Effects Bundle!
Appendix: Parsing New Record Types
Note how these use the generalized monad structure we discussed in this article (MonadParsec, MonadFail).
parseFooter' :: (MonadParsec Void ByteString m) => m (Word64, FooterRec)
parseFooter' = do
(n1, summaryStart) <- parseUint64LE
(n2, summaryOffsetStart) <- parseUint64LE
(n3, summaryCrc) <- parseUint32LE
return (n1 + n2 + n3, FooterRec summaryStart summaryOffsetStart summaryCrc)
parseSummaryOffset :: (MonadFail m, MonadParsec Void ByteString m) => m (Word64, SummaryOffsetRec)
parseSummaryOffset = do
(n1, op) <- parseRecordType'
(n2, gs) <- parseUint64LE
(n3, gl) <- parseUint64LE
return (n1 + n2 + n3, SummaryOffsetRec op gs gl)
parseChunkIndex :: (MonadFail m, MonadParsec Void ByteString m) => m (Word64, ChunkIndexRec)
parseChunkIndex = do
(n1, startT) <- parseTimestamp
(n2, endT) <- parseTimestamp
(n3, chunkStart) <- parseUint64LE
(n4, chunkLen) <- parseUint64LE
(n5, offsets) <- parseMap parseUint16LE parseUint64LE
(n6, indexLength) <- parseUint64LE
(n7, compression) <- parseString
(n8, compressedSize) <- parseUint64LE
(n9, uncompressedSize) <- parseUint64LE
let totalLen = n1 + n2 + n3 + n4 + n5 + n6 + n7 + n8 + n9
let rec = ChunkIndexRec startT endT chunkStart chunkLen offsets indexLength compression compressedSize uncompressedSize
return (totalLen, rec)
parseMessageIndex' :: (MonadFail m, MonadParsec Void ByteString m) => m (Word64, MessageIndexRec)
parseMessageIndex' = do
(n1, cid) <- parseUint16LE
(n2, recs) <- parseArray (parseTuple parseUint64LE parseUint64LE)
return (n1 + n2, MessageIndexRec cid recs)