MessageQueue Class Reference

A doubly-linked queue of messages. More...

#include <message.h>

Collaboration diagram for MessageQueue:
Collaboration graph

Detailed Description

A doubly-linked queue of messages.

Player Message objects are delivered by being pushed on and popped off queues of this type. Importantly, every driver has one, Driver::InQueue. All messages sent to the driver arrive on this queue. However, the driver rarely pops messages off its queue directly; instead the driver calls Driver::ProcessMessages, which hands off each incoming message to Driver::ProcessMessage (which the driver has presumably re-implemented).

Every queue has a maximum length that is determined when it is created; for drivers this happens in the constructor Driver::Driver. This length determines the maximum number of messages that can be pushed onto the queue. Since messages vary greatly in size, there is not a direct correspondence between the length of a queue and the memory that it occupies. Furthermore, since messages are reference counted and may be shared across multiple queues, it is not necessarily meaningful to consider how much memory a given queue "occupies."

The queue supports configurable message replacement. This functionality is useful when, for example, a driver wants new incoming commands to overwrite old ones, rather than to have them queue up. To decide whether an incoming message should replace an existing message that has the same address (host:robot:interface:index), type, and subtype, the following logic is applied:

  • If a matching replacement rule was set via AddReplaceRule(), that rule is followed.
  • Else:
    • If the message type is PLAYER_MSGTYPE_REQ, PLAYER_MSGTYPE_RESP_ACK, or PLAYER_MSGTYPE_RESP_NACK, the message is not replaced.
    • Else:

Most drivers can simply choose true or false in their constructors (this setting is passed on to the MessageQueue constructor). However, drivers that support multiple interfaces may use AddReplaceRule() to establish different replacement rules for messages that arrive for different interfaces. For example, the p2os driver has incoming commands to the wheelmotors overwrite each other, but queues up commands to the manipulator arm.

The queue also supports filtering based on device address. After SetFilter() is called, Pop() will only return messages that match the given filter. Use ClearFilter() to return to normal operation. This filter is not usually manipulated directly in driver code; it's main use inside Device::Request.

Public Member Functions

 MessageQueue (bool _Replace, size_t _Maxlen)
 Create an empty message queue.
 
 ~MessageQueue ()
 Destroy a message queue.
 
bool Empty ()
 Check whether a queue is empty.
 
bool Push (Message &msg)
 Push a message onto the queue. More...
 
void PushFront (Message &msg, bool haveLock)
 Put it at the front of the queue, without checking replacement rules or size limits. More...
 
void PushBack (Message &msg, bool haveLock)
 Push a message at the back of the queue, without checking replacement rules or size limits. More...
 
MessagePop ()
 Pop a message off the queue. More...
 
void SetReplace (bool _Replace)
 Set the Replace flag, which governs whether data and command messages of the same subtype from the same device are replaced in the queue. More...
 
void AddReplaceRule (int _host, int _robot, int _interf, int _index, int _type, int _subtype, int _replace)
 Add a replacement rule to the list. More...
 
void AddReplaceRule (const player_devaddr_t &device, int _type, int _subtype, int _replace)
 Add a replacement rule to the list. More...
 
int CheckReplace (player_msghdr_t *hdr)
 Check whether a message with the given header should replace any existing message of the same signature, be ignored or accepted. More...
 
bool Wait (double TimeOut=0.0)
 Wait on this queue. More...
 
void DataAvailable (void)
 Signal that new data is available. More...
 
bool Filter (Message &msg)
 Check whether a message passes the current filter.
 
void ClearFilter (void)
 Clear (i.e., turn off) message filter.
 
void SetFilter (int host, int robot, int interf, int index, int type, int subtype)
 Set filter values.
 
void SetPull (bool _pull)
 Set the pull flag.
 
size_t GetLength (void)
 Get current length of queue, in elements.
 
void SetDataRequested (bool d, bool haveLock)
 Set the data_requested flag.
 

Private Member Functions

void Lock ()
 Lock the mutex associated with this queue.
 
void Unlock ()
 Unlock the mutex associated with this queue.
 
void Remove (MessageQueueElement *el)
 Remove element el from the queue, and rearrange pointers appropriately. More...
 

Private Attributes

MessageQueueElementhead
 Head of the queue.
 
MessageQueueElementtail
 Tail of the queue.
 
pthread_mutex_t lock
 Mutex to control access to the queue, via Lock() and Unlock().
 
size_t Maxlen
 Maximum length of queue in elements.
 
MessageReplaceRulereplaceRules
 Singly-linked list of replacement rules.
 
bool Replace
 When a (data or command) message doesn't match a rule in replaceRules, should we replace it?
 
size_t Length
 Current length of queue, in elements.
 
pthread_cond_t cond
 A condition variable that can be used to signal, via DataAvailable(), other threads that are Wait()ing on this queue. More...
 
pthread_mutex_t condMutex
 Mutex to go with condition variable cond.
 
bool filter_on
 Current filter values.
 
int filter_host
 
int filter_robot
 
int filter_interf
 
int filter_index
 
int filter_type
 
int filter_subtype
 
bool pull
 Flag for if in pull mode. More...
 
bool data_requested
 Flag for data was requested (in PULL mode), but none has yet been delivered.
 
bool data_delivered
 Flag that data was sent (in PULL mode)
 
bool drop_count
 Count of the number of messages discarded due to queue overflow.
 

Member Function Documentation

◆ AddReplaceRule() [1/2]

void MessageQueue::AddReplaceRule ( int  _host,
int  _robot,
int  _interf,
int  _index,
int  _type,
int  _subtype,
int  _replace 
)

Add a replacement rule to the list.

The first 6 arguments determine the signature that a message will have to match in order for this rule to be applied. If an incoming message matches this signature, the last argument determines whether replacement will occur. Set any of the first 6 arguments to -1 to indicate don't care.

◆ AddReplaceRule() [2/2]

void MessageQueue::AddReplaceRule ( const player_devaddr_t device,
int  _type,
int  _subtype,
int  _replace 
)

Add a replacement rule to the list.

Use this version if you already have the device address assembled in a player_devaddr_t structure. The tradeoff is that you cannot use -1 to indicate don't care for those values (the fields in that structure are unsigned).

◆ CheckReplace()

int MessageQueue::CheckReplace ( player_msghdr_t hdr)

Check whether a message with the given header should replace any existing message of the same signature, be ignored or accepted.

◆ DataAvailable()

void MessageQueue::DataAvailable ( void  )

Signal that new data is available.

Calling this method will release any threads currently waiting on this queue.

◆ Pop()

Message* MessageQueue::Pop ( )

Pop a message off the queue.

Pop the head (i.e., the first-inserted) message from the queue. Returns pointer to said message, or NULL if the queue is empty

◆ Push()

bool MessageQueue::Push ( Message msg)

Push a message onto the queue.

Returns the success state of the Push operation (true if successful, false otherwise).

◆ PushBack()

void MessageQueue::PushBack ( Message msg,
bool  haveLock 
)

Push a message at the back of the queue, without checking replacement rules or size limits.

This method is used internally to insert most messages. The caller may have already called Lock() on this queue

◆ PushFront()

void MessageQueue::PushFront ( Message msg,
bool  haveLock 
)

Put it at the front of the queue, without checking replacement rules or size limits.

This method is used to insert responses to requests for data. The caller may have already called Lock() on this queue

◆ Remove()

void MessageQueue::Remove ( MessageQueueElement el)
private

Remove element el from the queue, and rearrange pointers appropriately.

◆ SetReplace()

void MessageQueue::SetReplace ( bool  _Replace)
inline

Set the Replace flag, which governs whether data and command messages of the same subtype from the same device are replaced in the queue.

◆ Wait()

bool MessageQueue::Wait ( double  TimeOut = 0.0)

Wait on this queue.

This method blocks until new data is available (as indicated by a call to DataAvailable()).

If TimeOut is set to a positive value this method will return false if the timeout occurs before and update is recieved.

Member Data Documentation

◆ cond

pthread_cond_t MessageQueue::cond
private

A condition variable that can be used to signal, via DataAvailable(), other threads that are Wait()ing on this queue.

◆ pull

bool MessageQueue::pull
private

Flag for if in pull mode.

If false, push mode. Push is default mode, but pull is the recommended method to avoid getting delays in data on the client.


The documentation for this class was generated from the following file: