Schemas, Channels & Messages in MCAP

Welcome to part 3 of our series on parsing ROS bags in the MCAP format. In Part 1, we went over the basics of ROS and MCAP, and wrote some simple parsers. In Part 2, we filled out our parser enough to see the complete record structure of our bag. In this part, we’re going to filter messages so that we only get those messages that are associated with particular topics. This will leave us with the actual data inside those messages, which we’ll parse in the next part!

To learn more about parsing, you should sign up for our course Solve.hs. The 4th module will teach you important Haskell parsing techniques, ranging from regular expressions to the Megaparsec library we’re using in this series.

Message Structure

Since we’ll start looking in depth at Message records in this part, let’s look at their structure:

Bytes | Name | Type	Description
2 | channel_id | uint16
4 | sequence | uint32
8 | log_time | Timestamp
8 | publish_time | Timestamp
N | data | Bytes

The data is, of course, the raw data for this message. The two times (log and publish) tell us when this message was written, as filtering messages by time is a common operation. We will ignore the sequence number for this example.

So far, we don’t actually have that much information about this message. We don’t know what topic it was published on, or how to decode its data. This information is shared across all messages for a particular topic, so it would be redundant to store it on each Message record. But for this article, we would like to filter messages based on topic, so how do we do this?

The channel_id field links us to all the missing information. So let’s now take a look at the structure of Channels and Schemas.

Schemas and Channels

Channels and Schemas provide the additional context that helps us parse the raw data in messages. As you’ll recall, Schema and Channel are both record types next to Message.

We’ll start with channels, since our message type links us directly to a channel with the channel_id field. A channel describes a single stream of messages on a particular topic. So every message on a channel comes from the same topic. As an important note, a Channel record will appear in the MCAP bag before any Message records on that channel. Here are the fields on a channel:

Bytes | Name | Type
2 | id | uint16
2 | schema_id | uint16
4 + N | topic | String
4 + N | message_encoding | String
4 + N | metadata | Map<string, string>

So each channel has a unique ID, a topic, a message encoding, and some metadata. The message encoding field is valuable for decoding our message data. Even if we know the data fields to expect in that data, there are different ways to encode it. In our case, we’ll be using “CDR”, the “Common Data Representation” format. Since the channel has the topic, we can filter messages by topic strictly by using the message records and the channel record each of them links to.

Now depending on how much you’re willing to assume about your program, this might be enough for you to decode the data. You would just need to map the topic name to some kind of data decoding function. But if you want to write a more general program, this isn’t enough. Thus a Channel also has a schema_id field to link it to a Schema.

A Schema can give a “definition” for a message. It has 4 fields, but in our example only 2 of them get populated. The fields are:

Bytes | Name | Type
2 | id | uint16
4 + N | name | String
4 + N | encoding | String
4 + N | data | uint32 length-prefixed Bytes

Just as a channel record will appear before any messages that refer to it, a Schema record will appear in the bag before any Channel record that references it.

The ID of the schema helps channel elements link to this schema element, of course. The “name” in our case will give us the full package name of the type we’re parsing: my_package/msg/Simple. In theory, this would help us locate the original message definition.

The other two schema fields will be blank in our example. ROS2 doesn’t rely on actually storing the schema data in the bag. It relies on you linking the message package name and applying that to decode. So in that sense, our program will be less general. But we’ll still go through the motions of tracking our schema and channel elements.

Updating our Monad

Since the Message record itself doesn’t contain the topic, we need to keep track of schema and channel elements that we’ve already parsed. The simplest way to do this is to use the State monad. We’re already operating within a ParsecT IO monad transformer stack. So what we need to do is define a stateful type for this information, and add StateT to this stack.

Our stateful type should track 5 things:

  1. The set of topics we’re interested in
  2. The set of channels we’re interested in
  3. A map from channel IDs to channel data
  4. A map from schema IDs to schema data
  5. A map of the message data we’ve received for our topics

We’ll start by making types that reflect the information we parse from these records, one type for schema, one for channel, and one for message. These are self-explanatory, following the field definitions from the specification.

data MsgSchema = MsgSchema
  { schemaId :: Word16
  , schemaName :: ByteString
  , schemaEncoding :: ByteString
  , schemaData :: ByteString
  } deriving (Show, Eq)

data MsgChannel = MsgChannel
  { channelId :: Word16
  , channelSchemaId :: Word16
  , channelTopic :: ByteString
  , channelEncoding :: ByteString
  , channelMetadata :: HM.HashMap ByteString ByteString
  } deriving (Show, Eq)

data MessageData = MessageData
  { messageLogTime :: Word64
  , messagePublishTime :: Word64
  , messageData :: ByteString
  } deriving (Show, Eq)

Now we can define a type TopicState that wraps the five elements we listed:

data TopicState = TopicState
  { tsChannels :: HM.HashMap Word16 MsgChannel
  , tsSchemas :: HM.HashMap Word16 MsgSchema
  , tsMessages :: HM.HashMap (ByteString, ByteString) [MessageData]
  , tsDesiredTopics :: HS.HashSet ByteString
  , tsDesiredChannels :: HS.HashSet Word16
  } deriving (Show, Eq)

The first item (desired topics) will be static…we’ll define it at the start. But the other fields will all get modified as we parse the relevant records. In the case of the message data, we’ll use the topic name and schema name as the key in our map.

Now we’ll update our monad to include this as a stateful object:

type Parser a = ParsecT Void ByteString (StateT TopicState IO) a

We also need to update our monadic entry point. Using runParserT now gives a StateT action, so we need to use evalStateT to convert that to an IO action producing our results.

printRecordTypesFromFile :: FilePath -> IO ()
parseBareRecordsFromFile fp = do
  input <- BS.readFile fp
  let initialTopicState = TopicState HM.empty HM.empty HM.empty (HS.singleton “/simple_topic”) HS.empty
  result <- runStateT (runParserT parseMcapFile' fp input) initialTopicState
  case result of
    Left e -> print e
    Right recs -> do
      forM_ recs $ \(_, rec) -> printRec rec
      print st

Otherwise, this shouldn’t impact most of our Parser functions. If you put in any print statements using lift, these would just need an additional lift now since IO is one level further down our monad stack (or you could use liftIO).

Parsing a Schema

Now that we’ve defined the new types for this part, let’s start using them. We already have a function parseChunk that parses relevant information from a Chunk record. Let’s write similar functions for these 3 record types. We’ll not only parse the data, but also save it in our state under the tsSchemas map.

Let’s start by parsing all the core fields of the Schema element and turning these into a MsgSchema object. This uses our existing primitives and patterns:

parseSchema :: Parser (Word64, Record)
parseSchema = do
  (n1, sid) <- parseUint16LE
  (n2, sname) <- parseString
  (n3, sencoding) <- parseString
  (n4, bytesLen) <- parseUint32LE
  schema <- BS.pack <$> count (fromIntegral bytesLen) anySingle
  let totalLen = n1 + n2 + n3 + n4 + fromIntegral bytesLen
  let schema' = MsgSchema sid sname sencoding schema
  ...

Now we need a StateT action that will “save” this schema element. We pull out the topic state, insert our new schema element into the tsSchemas field by its ID, and the modify the state.

parseSchema :: Parser (Word64, Record)
parseSchema = do
  (n1, sid) <- parseUint16LE
  (n2, sname) <- parseString
  (n3, sencoding) <- parseString
  (n4, bytesLen) <- parseUint32LE
  schema <- BS.pack <$> count (fromIntegral bytesLen) anySingle
  let totalLen = n1 + n2 + n3 + n4 + fromIntegral bytesLen
  let schema' = MsgSchema sid sname sencoding schema
  ...
  where
    addSchema :: MsgSchema -> StateT TopicState IO ()
    addSchema s = do
      tops <- get
      let newS = HM.insert (schemaId s) s (tsSchemas tops)
      put $ tops { tsSchemas = newS }

To complete the function, we just lift our stateful action into the Parser monad, and return the same record signifying the type that we have been so far.

parseSchema :: Parser (Word64, Record)
parseSchema = do
  (n1, sid) <- parseUint16LE
  (n2, sname) <- parseString
  (n3, sencoding) <- parseString
  (n4, bytesLen) <- parseUint32LE
  schema <- BS.pack <$> count (fromIntegral bytesLen) anySingle
  let totalLen = n1 + n2 + n3 + n4 + fromIntegral bytesLen
  let schema' = MsgSchema sid sname sencoding schema
  lift (addSchema schema’)
  return $ (totalLen, Record Schema)
  where
    addSchema :: MsgSchema -> StateT TopicState IO ()
    addSchema s = do
      tops <- get
      let newS = HM.insert (schemaId s) s (tsSchemas tops)
      put $ tops { tsSchemas = newS }

Parsing a Channel

Parsing a channel is mostly similar. We parse the fields using our primitives, and we start to define a function to add this channel into our topic state:

parseChannel :: Parser (Word64, Record)
parseChannel = do
  (n1, cid) <- parseUint16LE
  (n2, sid) <- parseUint16LE
  (n3, topic) <- parseString
  (n4, encoding) <- parseString
  (n5, mp) <- parseStrMap
  let totalLen = n1 + n2 + n3 + n4 + n5
  let channel = MsgChannel cid sid topic encoding mp
  lift (addChannel channel)
  return $ (totalLen, Record Channel)
  where
    addChannel :: MsgChannel -> StateT TopicState IO ()
    addChannel = ...

There are two structural differences between addChannel and addSchema. First, we only care about adding the channel if the topic is one of our desired topics. Second, we also want to update the desired channels set in addition to saving the channel by its ID. Otherwise things are the same:

parseChannel :: Parser (Word64, Record)
parseChannel = do
  (n1, cid) <- parseUint16LE
  (n2, sid) <- parseUint16LE
  (n3, topic) <- parseString
  (n4, encoding) <- parseString
  (n5, mp) <- parseStrMap
  let totalLen = n1 + n2 + n3 + n4 + n5
  let channel = MsgChannel cid sid topic encoding mp
  lift (addChannel channel)
  return $ (totalLen, Record Channel)
  where
    addChannel c = do
      let topic = channelTopic c
      tops <- get
      when (HS.member topic (tsDesiredTopics tops)) $ do
        let newC = HM.insert (channelId c) c (tsChannels tops)
        let newDC = HS.insert (channelId c) (tsDesiredChannels tops)
        put $ tops { tsChannels = newC, tsDesiredChannels = newDC }

Parsing a Message

Finally, let’s parse a Message record. Unlike the other record types, the data field for a message does prepend its own size. So we need to pass the recordContentLength as an additional argument for the function. This lets us parse all the message data:

parseMessage :: Word64 -> Parser (Word64, Record)
parseMessage len = do
  (a, channelId) <- parseUint16LE
  (b, sequence) <- parseUint32LE
  (c, logTime) <- parseTimestamp
  (d, publishTime) <- parseTimestamp
  let overheadLen = a + b + c + d
  guard' ("Message overhead exceeds content length: " <> show overheadLen <> " " <> show len) (overheadLen <= len)
  dataBytes <- BS.pack <$> count (fromIntegral (len - overheadLen)) anySingle
  ...

Now we have two jobs. First, we want to see if this message comes from a desired topic, and if we can find a schema for that topic. Then we want to save the message if so. For the first step, we’ll write a function that will take the TopicState and return the schema and topic name if they are desired.

parseMessage :: Word64 -> Parser (Word64, Record)
parseMessage len = do
  (a, channelId) <- parseUint16LE
  (b, sequence) <- parseUint32LE
  (c, logTime) <- parseTimestamp
  (d, publishTime) <- parseTimestamp
  let overheadLen = a + b + c + d
  guard' ("Message overhead exceeds content length: " <> show overheadLen <> " " <> show len) (overheadLen <= len)
  dataBytes <- BS.pack <$> count (fromIntegral (len - overheadLen)) anySingle
  tops <- lift get
  let schemaToDecode = findSchema tops channelId
  case schemaToDecode of
    Nothing -> return (len, Record Message)
    ...
  where
    findSchema :: TopicState -> Word16 -> Maybe (MsgSchema, ByteString)
    findSchema (TopicState chans schems _ _ desChans) channelId = do
      guard (HS.member channelId desChans)
      (MsgChannel _ sid topicName _ _) <- HM.lookup channelId chans
      schem <- HM.lookup sid schems
      return (schem, topicName)

We get the TopicState and then simply pass it to a function in the Maybe monad that evaluates various conditions like if the channel ID is desired, if we can find the channel for this ID, and if we can find the schema for that channel. If one of these checks fails, we simply skip processing the message.

Now we need an addMessage function similar to addSchema and addChannel above. We associate the message data (timestamps and bytes) with the topic name and the schema name in that map.

parseMessage :: Word64 -> Parser (Word64, Record)
parseMessage len = do
  (a, channelId) <- parseUint16LE
  (b, sequence) <- parseUint32LE
  (c, logTime) <- parseTimestamp
  (d, publishTime) <- parseTimestamp
  let overheadLen = a + b + c + d
  guard' ("Message overhead exceeds content length: " <> show overheadLen <> " " <> show len) (overheadLen <= len)
  dataBytes <- BS.pack <$> count (fromIntegral (len - overheadLen)) anySingle
  tops <- lift get
  let schemaToDecode = findSchema tops channelId
  case schemaToDecode of
    Nothing -> return (len, Record Message)
    Just ((MsgSchema _ sname _ _), topicName) -> do
      lift $ addMessage topicName sname logTime publishTime dataBytes
      return (len, Record Message)
  where
    findSchema :: TopicState -> Word16 -> Maybe (MsgSchema, ByteString)
    findSchema (TopicState chans schems _ _ desChans) channelId = do
      guard (HS.member channelId desChans)
      (MsgChannel _ sid topicName _ _) <- HM.lookup channelId chans
      schem <- HM.lookup sid schems
      return (schem, topicName)

    addMessage topicName schemaName logTime publishTime dataBytes = do
      tops <- get
      let prevMsgs = fromMaybe [] $ HM.lookup (topicName, schemaName) (tsMessages tops)
      let newTsm = HM.insert (topicName, schemaName) (MessageData logTime publishTime dataBytes : prevMsgs) (tsMessages tops)
      put $ tops { tsMessages = newTsm }

Bringing it Together

Now we just need to update parseSingleRecord so that it handles these 3 cases in addition to the Chunk case!

parseSingleRecord :: Parser (Word64, Record)
parseSingleRecord = do
  (typLen, typ) <- parseRecordType
  (rclLen, recordContentLength) <- parseUint64LE
  let totalLen = typLen + rclLen + recordContentLength
  record <- case typ of
    Chunk -> do
      (parsedChunkLength, rec) <- parseChunk
      guard' ("Parsed chunk length does not match: " <> show parsedChunkLength <> " " <> show recordContentLength) (parsedChunkLength == recordContentLength)
      return rec
    Channel -> do
      (parsedChannelLength, rec) <- parseChannel
      guard' ("Parsed channel length does not match: " <> show parsedChannelLength <> " " <> show recordContentLength) (parsedChannelLength == recordContentLength)
      return rec
    Schema -> do
      (parsedSchemaLength, rec) <- parseSchema
      guard' ("Parsed schema length does not match: " <> show parsedSchemaLength <> " " <> show recordContentLength) (parsedSchemaLength == recordContentLength)
      return rec
    Message -> do
      (parsedMessageLength, rec) <- parseMessage recordContentLength
      guard' ("Parsed message length does not match: " <> show parsedMessageLength <> " " <> show recordContentLength) (parsedMessageLength == recordContentLength)
      return rec
    _ -> do
      count (fromIntegral recordContentLength) anySingle
      return $ Record typ
  return (totalLen, record)

Our program will now parse the full bag and print out the header structure as well the final TopicState at the end! Here’s what we see for our “message data”:

Message 1:

MessageData {messageLogTime = 1757733836313810115, messagePublishTime = 1757733836313810115, messageData = "\NUL\SOH\NUL\NUL\ACK\NUL\NUL\NUL\NUL\NUL\NUL\NUL333333\EM@\b\NUL\NUL\NULGoodbye\NUL"

Message 2:

MessageData {messageLogTime = 1757733836289706147, messagePublishTime = 1757733836289706147, messageData = "\NUL\SOH\NUL\NUL\ENQ\NUL\NUL\NUL\NUL\NUL\NUL\NUL\205\204\204\204\204\204\DLE@\ACK\NUL\NUL\NULHello\NUL"}

Conclusion

The data we have still isn’t structured though! We need to figure out how to parse these bytes out of the CDR encoding. We’ll do that next time!

In the meantime, you can learn more about parsing (and other problem solving tactics) by signing up for Solve.hs, our problem solving course! The first 3 modules will teach you about Data Structures and Algorithms in Haskell, and then the final module will teach you advanced parsing techniques like Megaparsec!

Next
Next

The Structure of an MCAP File