| Mercury Monitor Reference Manual |
|---|
ConnectionsConnections — Management of network connections |
#include <monitor/producer/connection.h>
prod_msg;
enum prod_msg_class;
prod_msglist;
prod_msgqueue;
prod_conn;
enum prod_conn_type;
enum prod_conn_state;
enum prod_conn_flags;
prod_outgoing_data;
int prod_add_listener (const char *url);
prod_conn* prod_conn_new (prod_proto_module *proto);
void prod_conn_done (prod_conn *conn);
void prod_conn_ref (prod_conn *conn);
void prod_conn_close (prod_conn *conn);
int prod_conn_send (prod_conn *conn,
prod_msg *msg);
int prod_conn_send_caps (prod_conn *conn);
int prod_conn_send_metric (prod_conn *conn,
uint32_t metric_id,
prod_metric_value *mv);
int prod_conn_send_response (prod_conn *conn,
prod_cmd *cmd,
uint32_t status,
uint32_t metric_id);
int prod_conn_send_data (prod_conn *conn,
uint32_t metric_id,
const char *data,
uint32_t len);
int prod_conn_send_wrap (prod_conn *conn);
void prod_conn_msg_del (prod_conn *conn,
uint32_t metric_id);
void prod_conn_mid_req_add (prod_conn *conn,
uint32_t id);
void prod_conn_mid_req_del (prod_conn *conn,
uint32_t id);
void prod_conn_add_killed (prod_conn *conn,
uint32_t id);
void prod_conn_del_killed (prod_conn *conn,
uint32_t id);
int prod_conn_is_killed (prod_conn *conn,
uint32_t id);
void prod_conn_barrier_add (prod_conn *conn,
uint32_t id);
void prod_conn_barrier_del (prod_conn *conn,
uint32_t id);
prod_conn* prod_conn_incoming_new (prod_proto_module *proto,
int sd,
struct sockaddr *peer_addr,
unsigned int addrlen);
prod_conn* prod_conn_outgoing_new (prod_proto_module *proto,
prod_outgoing_data *odata);
int prod_conn_outgoing_open (prod_conn *conn);
int prod_conn_outgoing_handshake (prod_conn *conn);
typedef struct {
/* Message format */
mon_format format : 16;
/* Message delivery class */
prod_msg_class msg_class : 16;
/* Metric ID */
uint32_t id;
/* Pointer to the next message in the queue */
prod_msg *next;
/* Message contents */
union
{
mon_msg_cmd_response resp;
prod_metric_value *mv;
mon_msg_producer_caps caps;
mon_msg_data data;
};
} prod_msg;
Represents a message in the wait queue of a connection.
| mon_format format : 16; | format of the message. |
| prod_msg_class msg_class : 16; | the class of the message. |
| uint32_t id; | the metric ID of the message. |
| prod_msg *next; | pointer to the next message in the queue. |
| mon_msg_cmd_response resp; | a mon_msg_cmd_response. |
| prod_metric_value *mv; | a prod_metric_value. |
| mon_msg_producer_caps caps; | a mon_msg_producer_caps. |
| mon_msg_data data; | a mon_msg_data. |
typedef enum {
/* Do not send automatically. */
PROD_MSG_DEFER,
/* Should be sent immediately, regardless of conn->barrier. */
PROD_MSG_IMMED,
/* Should be sent when conn->barrier is not set. */
PROD_MSG_SUBSCRIBED,
/* Marks a barrier for WRAP. */
PROD_MSG_WRAP
} prod_msg_class;
Message classes for messages in the wait queue.
| PROD_MSG_DEFER | deferred until explicitly requested by a GET command. |
| PROD_MSG_IMMED | send immediately before any PROD_MSG_DEFER or PROD_MSG_SUBSCRIBED messages. |
| PROD_MSG_SUBSCRIBED | send it as soon as possible unless it is explicitely blocked. |
| PROD_MSG_WRAP | virtual message for synchronization needed by the WRAP command. |
typedef struct {
/* The first element in the list */
prod_msg *head;
/* The last element in the list */
prod_msg *tail;
/* Number of messages that were dropped */
unsigned int dropped;
} prod_msglist;
Head of a list containing prod_msg entries.
typedef struct {
/* The first message that is ready to send */
prod_msg *head;
/* The last message that is ready to send */
prod_msg *tail;
/* Messages that are either DEFERred or blocked. Key is the
* message id, value is a prod_msglist */
GHashTable *pending;
/* Total number of messages in the queue */
unsigned int total;
/* Max. number of messages */
unsigned int limit;
/* Number of messages that was dropped since the queue became full.
* It is the sum of the dropped fields of the message lists in the
* pending table */
unsigned int dropped;
} prod_msgqueue;
Describes the message queue for a prod_conn.
| prod_msg *head; | the first element of the "ready" list. |
| prod_msg *tail; | the last element of the "ready" list. |
| GHashTable *pending; | the table of pending messages. Key is the message ID, the value is a prod_msglist structure. |
| unsigned int total; | total number of messages in the queue ("ready" and "pending" combined). |
| unsigned int limit; | max. number of messages in the queue. If more messages arrive, they will be dropped. |
| unsigned int dropped; | number of messages dropped since the queue became full. When the queue is no longer full, a MON_MID_DROPPED message is sent to the consumer and this counter is reset to 0. |
typedef struct {
prod_conn_type type : 8;
prod_conn_state state : 8;
prod_conn_flags flags : 16;
mon_bio_head bh;
mon_addrlist *peer_addr;
prod_outgoing_data *outgoing;
mon_bio *pending_bio;
mon_auth_creds *auth_creds;
mon_auth_state *auth_state;
prod_proto_module *proto;
prod_proto_state *proto_state;
prod_user *user;
prod_conn *parent;
/* Metric IDs for which a get command is in effect */
GHashTable *mid_req;
/* Metric IDs that should be held back */
GHashTable *barrier;
/* Metric IDs subscribed to this connection */
GHashTable *subscribed;
/* Metric IDs killed but not yet STOPped */
GList *killed_ids;
/* List of pending commands */
GList *pending_cmds;
/* Messages waiting for encoding by the protocol module */
prod_msgqueue msgqueue;
uint32_t id;
int refcnt;
} prod_conn;
Represents a connection to a consumer.
| prod_conn_type type : 8; | prod_conn_type. |
| prod_conn_state state : 8; | prod_conn_state. |
| prod_conn_flags flags : 16; | prod_conn_flags. |
| mon_bio_head bh; | a mon_bio_head used for I/O. |
| mon_addrlist *peer_addr; | the network address of the peer. |
| prod_outgoing_data *outgoing; | extra data for outgoing connections. |
| mon_bio *pending_bio; | BIO pending for a WRAP command. |
| mon_auth_creds *auth_creds; | authentication credentials. |
| mon_auth_state *auth_state; | authantication state. |
| prod_proto_module *proto; | protocol module. |
| prod_proto_state *proto_state; | the internal protocol state. |
| prod_user *user; | a prod_user this connection belongs to. |
| prod_conn *parent; | the prod_conn that created this connection. |
| GHashTable *mid_req; | hash table for metric IDs requested by GET commands. |
| GHashTable *barrier; | hash table for metric IDs that has to be temporarily blocked. |
| GHashTable *subscribed; | hash table for metric IDs that are subscribed to this connection. |
| GList *killed_ids; | metric IDs that were killed but has not been STOPped yet. |
| GList *pending_cmds; | list of pending commands. |
| prod_msgqueue msgqueue; | message queue. |
| uint32_t id; | channel id. |
| int refcnt; | reference count. |
typedef enum {
PROD_CHANNEL_INBOUND, /* consumer-initiated */
PROD_CHANNEL_OUTBOUND /* producer-initiated */
} prod_conn_type;
Type of a prod_conn.
| PROD_CHANNEL_INBOUND | this is an inbound connection. |
| PROD_CHANNEL_OUTBOUND | this is an outbound connection. |
typedef enum {
PROD_STATE_CONNECTING, /* connect() in progress */
PROD_STATE_HANDSHAKE, /* BIO handshake in progress */
PROD_STATE_NOTAUTH, /* Open but not yet authenticated */
PROD_STATE_READY, /* Authenticated and ready */
PROD_STATE_WRAP, /* Processing a WRAP command */
PROD_STATE_CLOSING, /* The connection is closing */
PROD_STATE_CLOSED /* The connection is closed */
} prod_conn_state;
State of a prod_conn.
| PROD_STATE_CONNECTING | connect() is in progress. |
| PROD_STATE_HANDSHAKE | BIO handshake is in progress. |
| PROD_STATE_NOTAUTH | the network connection is ready but authentication has not been performed yet. |
| PROD_STATE_READY | the connection is authenticated and ready to receive commands or send data. |
| PROD_STATE_WRAP | a WRAP command is being processed. |
| PROD_STATE_CLOSING | the connection is closing. |
| PROD_STATE_CLOSED | the network connection is closed. |
typedef enum {
/* Marks a persistent connection */
PROD_CONN_PERSISTENT = (1 << 0),
/* If set, a read operation should be performed when the socket is
* available for writing */
PROD_CONN_RETRY_READ = (1 << 1),
/* If set, a write operation should be performed when the socket is
* available for reading */
PROD_CONN_RETRY_WRITE = (1 << 2),
/* If set, a reconnect is scheduled for the connection */
PROD_CONN_RECONNECT = (1 << 3),
/* If set, call msgqueue_run() when there is free space in the buffer */
PROD_CONN_DO_RUNQUEUE = (1 << 4),
/* The output buffers are completely flushed */
PROD_CONN_FLUSHED = (1 << 5),
/* MON_IO_READ is set on the socket descriptor */
PROD_CONN_READ_SET = (1 << 6),
/* MON_IO_WRITE is set on the socket descriptor */
PROD_CONN_WRITE_SET = (1 << 7)
} prod_conn_flags;
Flags for a prod_conn.
| PROD_CONN_PERSISTENT | the connection is persistent. |
| PROD_CONN_RETRY_READ | a BIO read operation should be performed when the connection gets ready for writing. |
| PROD_CONN_RETRY_WRITE | a BIO write operation should be performed when the connection gets ready for reading. |
| PROD_CONN_RECONNECT | a reconnect is scheduled for the connection. |
| PROD_CONN_DO_RUNQUEUE | prod_conn_msgqueue_run() should be called if there is free space available in the output buffer. |
| PROD_CONN_FLUSHED | the output buffers are empty. |
| PROD_CONN_READ_SET | the socket is set up to receive MON_IO_READ events. |
| PROD_CONN_WRITE_SET | the socket is set up to receive MON_IO_WRITE events. |
typedef struct {
char *url;
mon_addrlist *addr;
GList **addr_iterator;
char *auth_method;
void *creds;
} prod_outgoing_data;
Extra data for outgoing connections.
| char *url; | target URL. |
| mon_addrlist *addr; | target address(es). |
| GList **addr_iterator; | used for iterating addr if the target URL resolves to multiple addresses. |
| char *auth_method; | authentication method to use. |
| void *creds; | credentials to use. |
int prod_add_listener (const char *url);
Creates a new listener socket on the address/port specified by the URL.
| url : | the URL to listen on. |
| Returns : | 0 if successful or an error code. |
prod_conn* prod_conn_new (prod_proto_module *proto);
Allocates a new prod_conn structure.
| proto : | the protocol module to use. |
| Returns : | a new prod_conn or NULL if out of memory. |
void prod_conn_done (prod_conn *conn);
Decrements the reference count of a prod_conn. If the reference count reaches zero, the connection is closed and every associated resources are deallocated.
| conn : | a prod_conn. |
void prod_conn_ref (prod_conn *conn);
Increases the reference count of a prod_conn.
| conn : | a prod_conn. |
int prod_conn_send (prod_conn *conn, prod_msg *msg);
Sends a message to a prod_conn.
int prod_conn_send_caps (prod_conn *conn);
Sends producer capabilities to a prod_conn.
| conn : | a prod_conn. |
| Returns : | 0 if successful or an error code. |
int prod_conn_send_metric (prod_conn *conn, uint32_t metric_id, prod_metric_value *mv);
Sends a metric value to a prod_conn.
| conn : | a prod_conn. |
| metric_id : | numeric metric id. |
| mv : | a prod_metric_value. |
| Returns : | 0 if successful or an error code. |
int prod_conn_send_response (prod_conn *conn, prod_cmd *cmd, uint32_t status, uint32_t metric_id);
Sends a command response to a prod_conn.
int prod_conn_send_data (prod_conn *conn, uint32_t metric_id, const char *data, uint32_t len);
Sends opaque data to a prod_conn.
| conn : | a prod_conn. |
| metric_id : | numeric metric id. |
| data : | opaque data to send. |
| len : | the length of data. |
| Returns : | 0 if successful or an error code. |
int prod_conn_send_wrap (prod_conn *conn);
Sends a virtual message to a prod_conn that is used for synchronizing the input and output buffers.
| conn : | a prod_conn. |
| Returns : | 0 if successful or an error code. |
void prod_conn_msg_del (prod_conn *conn, uint32_t metric_id);
Deletes all messages from the connection's message queue that have the specified metric id.
| conn : | a prod_conn. |
| metric_id : | the metric id to delete. |
void prod_conn_mid_req_add (prod_conn *conn, uint32_t id);
Marks a metric id as requested. If a metric value with this metric id arrives, it will be sent immediately to the consumer and the request flag is removed.
| conn : | a prod_conn. |
| id : | the requested metric id. |
void prod_conn_mid_req_del (prod_conn *conn, uint32_t id);
Removes a metric id from the set of requested ids.
| conn : | a prod_conn. |
| id : | the requested metric id. |
void prod_conn_add_killed (prod_conn *conn, uint32_t id);
Adds a metric id to the list of killed ids.
| conn : | a prod_conn. |
| id : | the id of a killed metric. |
void prod_conn_del_killed (prod_conn *conn, uint32_t id);
Removes a metric id from the list of killed ids.
| conn : | a prod_conn. |
| id : | the id of a killed metric. |
int prod_conn_is_killed (prod_conn *conn, uint32_t id);
Checks if a given metric id is on the list of killed ids.
| conn : | a prod_conn. |
| id : | a metric id. |
| Returns : | TRUE if the id is on the list of killed ids, FALSE otherwise. |
void prod_conn_barrier_add (prod_conn *conn, uint32_t id);
Marks a metric id as blocked.
| conn : | a prod_conn. |
| id : | a metric id to block. |
void prod_conn_barrier_del (prod_conn *conn, uint32_t id);
Removes the blocking of a metric id. If the connection's message queue contains messages with this id, prod_conn_msgqueue_run() is called to try to send them.
| conn : | a prod_conn. |
| id : | a metric id to unblock. |
prod_conn* prod_conn_incoming_new (prod_proto_module *proto, int sd, struct sockaddr *peer_addr, unsigned int addrlen);
Allocates a new incoming connection.
| proto : | the prod_proto_module to use. |
| sd : | the socket descriptor to use. |
| peer_addr : | the network address of the peer. |
| addrlen : | the length of peer_addr. |
| Returns : | a prod_conn or NULL if out of memory. |
prod_conn* prod_conn_outgoing_new (prod_proto_module *proto, prod_outgoing_data *odata);
Allocates a new outgoing connection.
| proto : | the prod_proto_module to use. |
| odata : | prod_outgoing_data needed for connecting. |
| Returns : | a prod_conn or NULL if out of memory. |
int prod_conn_outgoing_open (prod_conn *conn);
Opens an outgoing connection.
| conn : | a prod_conn. |
| Returns : | 0 if successful or an error code. |
| << Command Processing | Producer API Core Functions >> |