Connections

Connections — Management of network connections

Synopsis


#include <monitor/producer/connection.h>


            prod_msg;
enum        prod_msg_class;
            prod_msglist;
            prod_msgqueue;
            prod_conn;
enum        prod_conn_type;
enum        prod_conn_state;
enum        prod_conn_flags;
            prod_outgoing_data;

int         prod_add_listener               (const char *url);
prod_conn*  prod_conn_new                   (prod_proto_module *proto);
void        prod_conn_done                  (prod_conn *conn);
void        prod_conn_ref                   (prod_conn *conn);
void        prod_conn_close                 (prod_conn *conn);
int         prod_conn_send                  (prod_conn *conn,
                                             prod_msg *msg);
int         prod_conn_send_caps             (prod_conn *conn);
int         prod_conn_send_metric           (prod_conn *conn,
                                             uint32_t metric_id,
                                             prod_metric_value *mv);
int         prod_conn_send_response         (prod_conn *conn,
                                             prod_cmd *cmd,
                                             uint32_t status,
                                             uint32_t metric_id);
int         prod_conn_send_data             (prod_conn *conn,
                                             uint32_t metric_id,
                                             const char *data,
                                             uint32_t len);
int         prod_conn_send_wrap             (prod_conn *conn);
void        prod_conn_msg_del               (prod_conn *conn,
                                             uint32_t metric_id);
void        prod_conn_mid_req_add           (prod_conn *conn,
                                             uint32_t id);
void        prod_conn_mid_req_del           (prod_conn *conn,
                                             uint32_t id);
void        prod_conn_add_killed            (prod_conn *conn,
                                             uint32_t id);
void        prod_conn_del_killed            (prod_conn *conn,
                                             uint32_t id);
int         prod_conn_is_killed             (prod_conn *conn,
                                             uint32_t id);
void        prod_conn_barrier_add           (prod_conn *conn,
                                             uint32_t id);
void        prod_conn_barrier_del           (prod_conn *conn,
                                             uint32_t id);
prod_conn*  prod_conn_incoming_new          (prod_proto_module *proto,
                                             int sd,
                                             struct sockaddr *peer_addr,
                                             unsigned int addrlen);
prod_conn*  prod_conn_outgoing_new          (prod_proto_module *proto,
                                             prod_outgoing_data *odata);
int         prod_conn_outgoing_open         (prod_conn *conn);
int         prod_conn_outgoing_handshake    (prod_conn *conn);

Description

Details

prod_msg

typedef struct {
	/* Message format */
	mon_format			format : 16;
	/* Message delivery class */
	prod_msg_class			msg_class : 16;
	/* Metric ID */
	uint32_t			id;
	/* Pointer to the next message in the queue */
	prod_msg			*next;
	/* Message contents */
	union
	{
		mon_msg_cmd_response	resp;
		prod_metric_value	*mv;
		mon_msg_producer_caps	caps;
		mon_msg_data		data;
	};
} prod_msg;

Represents a message in the wait queue of a connection.

mon_format format : 16;format of the message.
prod_msg_class msg_class : 16;the class of the message.
uint32_t id;the metric ID of the message.
prod_msg *next;pointer to the next message in the queue.
mon_msg_cmd_response resp;a mon_msg_cmd_response.
prod_metric_value *mv;a prod_metric_value.
mon_msg_producer_caps caps;a mon_msg_producer_caps.
mon_msg_data data;a mon_msg_data.

enum prod_msg_class

typedef enum {
	/* Do not send automatically. */
	PROD_MSG_DEFER,
	/* Should be sent immediately, regardless of conn->barrier. */
	PROD_MSG_IMMED,
	/* Should be sent when conn->barrier is not set. */
	PROD_MSG_SUBSCRIBED,
	/* Marks a barrier for WRAP. */
	PROD_MSG_WRAP
} prod_msg_class;

Message classes for messages in the wait queue.

PROD_MSG_DEFERdeferred until explicitly requested by a GET command.
PROD_MSG_IMMEDsend immediately before any PROD_MSG_DEFER or PROD_MSG_SUBSCRIBED messages.
PROD_MSG_SUBSCRIBEDsend it as soon as possible unless it is explicitely blocked.
PROD_MSG_WRAPvirtual message for synchronization needed by the WRAP command.

prod_msglist

typedef struct {
	/* The first element in the list */
	prod_msg		*head;
	/* The last element in the list */
	prod_msg		*tail;
	/* Number of messages that were dropped */
	unsigned int		dropped;
} prod_msglist;

Head of a list containing prod_msg entries.

prod_msg *head;the first element of the list.
prod_msg *tail;the last element of the list.
unsigned int dropped;number of messages belonging to this message id that were dropped since the last MON_MID_DROPPED message has been sent

prod_msgqueue

typedef struct {
	/* The first message that is ready to send */
	prod_msg		*head;
	/* The last message that is ready to send */
	prod_msg		*tail;

	/* Messages that are either DEFERred or blocked. Key is the
	 * message id, value is a prod_msglist */
	GHashTable		*pending;

	/* Total number of messages in the queue */
	unsigned int		total;
	/* Max. number of messages */
	unsigned int		limit;
	/* Number of messages that was dropped since the queue became full.
	 * It is the sum of the dropped fields of the message lists in the
	 * pending table */
	unsigned int		dropped;
} prod_msgqueue;

Describes the message queue for a prod_conn.

prod_msg *head;the first element of the "ready" list.
prod_msg *tail;the last element of the "ready" list.
GHashTable *pending;the table of pending messages. Key is the message ID, the value is a prod_msglist structure.
unsigned int total;total number of messages in the queue ("ready" and "pending" combined).
unsigned int limit;max. number of messages in the queue. If more messages arrive, they will be dropped.
unsigned int dropped;number of messages dropped since the queue became full. When the queue is no longer full, a MON_MID_DROPPED message is sent to the consumer and this counter is reset to 0.

prod_conn

typedef struct {
	prod_conn_type		type : 8;
	prod_conn_state		state : 8;
	prod_conn_flags		flags : 16;

	mon_bio_head		bh;
	mon_addrlist		*peer_addr;
	prod_outgoing_data	*outgoing;
	mon_bio			*pending_bio;

	mon_auth_creds		*auth_creds;
	mon_auth_state		*auth_state;

	prod_proto_module	*proto;
	prod_proto_state	*proto_state;
	prod_user		*user;

	prod_conn		*parent;

	/* Metric IDs for which a get command is in effect */
	GHashTable		*mid_req;
	/* Metric IDs that should be held back */
	GHashTable		*barrier;
	/* Metric IDs subscribed to this connection */
	GHashTable		*subscribed;
	/* Metric IDs killed but not yet STOPped */
	GList			*killed_ids;
	/* List of pending commands */
	GList			*pending_cmds;

	/* Messages waiting for encoding by the protocol module */
	prod_msgqueue		msgqueue;

	uint32_t		id;
	int			refcnt;
} prod_conn;

Represents a connection to a consumer.

prod_conn_type type : 8;prod_conn_type.
prod_conn_state state : 8;prod_conn_state.
prod_conn_flags flags : 16;prod_conn_flags.
mon_bio_head bh;a mon_bio_head used for I/O.
mon_addrlist *peer_addr;the network address of the peer.
prod_outgoing_data *outgoing;extra data for outgoing connections.
mon_bio *pending_bio;BIO pending for a WRAP command.
mon_auth_creds *auth_creds;authentication credentials.
mon_auth_state *auth_state;authantication state.
prod_proto_module *proto;protocol module.
prod_proto_state *proto_state;the internal protocol state.
prod_user *user;a prod_user this connection belongs to.
prod_conn *parent;the prod_conn that created this connection.
GHashTable *mid_req;hash table for metric IDs requested by GET commands.
GHashTable *barrier;hash table for metric IDs that has to be temporarily blocked.
GHashTable *subscribed;hash table for metric IDs that are subscribed to this connection.
GList *killed_ids;metric IDs that were killed but has not been STOPped yet.
GList *pending_cmds;list of pending commands.
prod_msgqueue msgqueue;message queue.
uint32_t id;channel id.
int refcnt;reference count.

enum prod_conn_type

typedef enum {
	PROD_CHANNEL_INBOUND,	/* consumer-initiated */
	PROD_CHANNEL_OUTBOUND	/* producer-initiated */
} prod_conn_type;

Type of a prod_conn.

PROD_CHANNEL_INBOUNDthis is an inbound connection.
PROD_CHANNEL_OUTBOUNDthis is an outbound connection.

enum prod_conn_state

typedef enum {
	PROD_STATE_CONNECTING,	/* connect() in progress */
	PROD_STATE_HANDSHAKE,	/* BIO handshake in progress */
	PROD_STATE_NOTAUTH,	/* Open but not yet authenticated */
	PROD_STATE_READY,	/* Authenticated and ready */
	PROD_STATE_WRAP,	/* Processing a WRAP command */
	PROD_STATE_CLOSING,	/* The connection is closing */
	PROD_STATE_CLOSED	/* The connection is closed */
} prod_conn_state;

State of a prod_conn.

PROD_STATE_CONNECTINGconnect() is in progress.
PROD_STATE_HANDSHAKEBIO handshake is in progress.
PROD_STATE_NOTAUTHthe network connection is ready but authentication has not been performed yet.
PROD_STATE_READYthe connection is authenticated and ready to receive commands or send data.
PROD_STATE_WRAPa WRAP command is being processed.
PROD_STATE_CLOSINGthe connection is closing.
PROD_STATE_CLOSEDthe network connection is closed.

enum prod_conn_flags

typedef enum {
	/* Marks a persistent connection */
	PROD_CONN_PERSISTENT	= (1 << 0),
	/* If set, a read operation should be performed when the socket is
	 * available for writing */
	PROD_CONN_RETRY_READ	= (1 << 1),
	/* If set, a write operation should be performed when the socket is
	 * available for reading */
	PROD_CONN_RETRY_WRITE	= (1 << 2),
	/* If set, a reconnect is scheduled for the connection */
	PROD_CONN_RECONNECT	= (1 << 3),
	/* If set, call msgqueue_run() when there is free space in the buffer */
	PROD_CONN_DO_RUNQUEUE	= (1 << 4),
	/* The output buffers are completely flushed */
	PROD_CONN_FLUSHED	= (1 << 5),
	/* MON_IO_READ is set on the socket descriptor */
	PROD_CONN_READ_SET	= (1 << 6),
	/* MON_IO_WRITE is set on the socket descriptor */
	PROD_CONN_WRITE_SET	= (1 << 7)
} prod_conn_flags;

Flags for a prod_conn.

PROD_CONN_PERSISTENTthe connection is persistent.
PROD_CONN_RETRY_READa BIO read operation should be performed when the connection gets ready for writing.
PROD_CONN_RETRY_WRITEa BIO write operation should be performed when the connection gets ready for reading.
PROD_CONN_RECONNECTa reconnect is scheduled for the connection.
PROD_CONN_DO_RUNQUEUEprod_conn_msgqueue_run() should be called if there is free space available in the output buffer.
PROD_CONN_FLUSHEDthe output buffers are empty.
PROD_CONN_READ_SETthe socket is set up to receive MON_IO_READ events.
PROD_CONN_WRITE_SETthe socket is set up to receive MON_IO_WRITE events.

prod_outgoing_data

typedef struct {
	char			*url;
	mon_addrlist		*addr;
	GList			**addr_iterator;
	char			*auth_method;
	void			*creds;
} prod_outgoing_data;

Extra data for outgoing connections.

char *url;target URL.
mon_addrlist *addr;target address(es).
GList **addr_iterator;used for iterating addr if the target URL resolves to multiple addresses.
char *auth_method;authentication method to use.
void *creds;credentials to use.

prod_add_listener ()

int         prod_add_listener               (const char *url);

Creates a new listener socket on the address/port specified by the URL.

url :the URL to listen on.
Returns :0 if successful or an error code.

prod_conn_new ()

prod_conn*  prod_conn_new                   (prod_proto_module *proto);

Allocates a new prod_conn structure.

proto :the protocol module to use.
Returns :a new prod_conn or NULL if out of memory.

prod_conn_done ()

void        prod_conn_done                  (prod_conn *conn);

Decrements the reference count of a prod_conn. If the reference count reaches zero, the connection is closed and every associated resources are deallocated.

conn :a prod_conn.

prod_conn_ref ()

void        prod_conn_ref                   (prod_conn *conn);

Increases the reference count of a prod_conn.

conn :a prod_conn.

prod_conn_close ()

void        prod_conn_close                 (prod_conn *conn);

Closes a prod_conn.

conn :a prod_conn.

prod_conn_send ()

int         prod_conn_send                  (prod_conn *conn,
                                             prod_msg *msg);

Sends a message to a prod_conn.

conn :a prod_conn.
msg :a prod_msg.
Returns :0 if successful or an error code.

prod_conn_send_caps ()

int         prod_conn_send_caps             (prod_conn *conn);

Sends producer capabilities to a prod_conn.

conn :a prod_conn.
Returns :0 if successful or an error code.

prod_conn_send_metric ()

int         prod_conn_send_metric           (prod_conn *conn,
                                             uint32_t metric_id,
                                             prod_metric_value *mv);

Sends a metric value to a prod_conn.

conn :a prod_conn.
metric_id :numeric metric id.
mv :a prod_metric_value.
Returns :0 if successful or an error code.

prod_conn_send_response ()

int         prod_conn_send_response         (prod_conn *conn,
                                             prod_cmd *cmd,
                                             uint32_t status,
                                             uint32_t metric_id);

Sends a command response to a prod_conn.

conn :a prod_conn.
cmd :a prod_cmd.
status :the command's status.
metric_id :optional metric id.
Returns :0 if successful or an error code.

prod_conn_send_data ()

int         prod_conn_send_data             (prod_conn *conn,
                                             uint32_t metric_id,
                                             const char *data,
                                             uint32_t len);

Sends opaque data to a prod_conn.

conn :a prod_conn.
metric_id :numeric metric id.
data :opaque data to send.
len :the length of data.
Returns :0 if successful or an error code.

prod_conn_send_wrap ()

int         prod_conn_send_wrap             (prod_conn *conn);

Sends a virtual message to a prod_conn that is used for synchronizing the input and output buffers.

conn :a prod_conn.
Returns :0 if successful or an error code.

prod_conn_msg_del ()

void        prod_conn_msg_del               (prod_conn *conn,
                                             uint32_t metric_id);

Deletes all messages from the connection's message queue that have the specified metric id.

conn :a prod_conn.
metric_id :the metric id to delete.

prod_conn_mid_req_add ()

void        prod_conn_mid_req_add           (prod_conn *conn,
                                             uint32_t id);

Marks a metric id as requested. If a metric value with this metric id arrives, it will be sent immediately to the consumer and the request flag is removed.

conn :a prod_conn.
id :the requested metric id.

prod_conn_mid_req_del ()

void        prod_conn_mid_req_del           (prod_conn *conn,
                                             uint32_t id);

Removes a metric id from the set of requested ids.

conn :a prod_conn.
id :the requested metric id.

prod_conn_add_killed ()

void        prod_conn_add_killed            (prod_conn *conn,
                                             uint32_t id);

Adds a metric id to the list of killed ids.

conn :a prod_conn.
id :the id of a killed metric.

prod_conn_del_killed ()

void        prod_conn_del_killed            (prod_conn *conn,
                                             uint32_t id);

Removes a metric id from the list of killed ids.

conn :a prod_conn.
id :the id of a killed metric.

prod_conn_is_killed ()

int         prod_conn_is_killed             (prod_conn *conn,
                                             uint32_t id);

Checks if a given metric id is on the list of killed ids.

conn :a prod_conn.
id :a metric id.
Returns :TRUE if the id is on the list of killed ids, FALSE otherwise.

prod_conn_barrier_add ()

void        prod_conn_barrier_add           (prod_conn *conn,
                                             uint32_t id);

Marks a metric id as blocked.

conn :a prod_conn.
id :a metric id to block.

prod_conn_barrier_del ()

void        prod_conn_barrier_del           (prod_conn *conn,
                                             uint32_t id);

Removes the blocking of a metric id. If the connection's message queue contains messages with this id, prod_conn_msgqueue_run() is called to try to send them.

conn :a prod_conn.
id :a metric id to unblock.

prod_conn_incoming_new ()

prod_conn*  prod_conn_incoming_new          (prod_proto_module *proto,
                                             int sd,
                                             struct sockaddr *peer_addr,
                                             unsigned int addrlen);

Allocates a new incoming connection.

proto :the prod_proto_module to use.
sd :the socket descriptor to use.
peer_addr :the network address of the peer.
addrlen :the length of peer_addr.
Returns :a prod_conn or NULL if out of memory.

prod_conn_outgoing_new ()

prod_conn*  prod_conn_outgoing_new          (prod_proto_module *proto,
                                             prod_outgoing_data *odata);

Allocates a new outgoing connection.

proto :the prod_proto_module to use.
odata :prod_outgoing_data needed for connecting.
Returns :a prod_conn or NULL if out of memory.

prod_conn_outgoing_open ()

int         prod_conn_outgoing_open         (prod_conn *conn);

Opens an outgoing connection.

conn :a prod_conn.
Returns :0 if successful or an error code.

prod_conn_outgoing_handshake ()

int         prod_conn_outgoing_handshake    (prod_conn *conn);

Performs BIO handshake for an outgoing connection.

conn :a prod_conn.
Returns :0 if successful or an error code.