[collectd] [PATCH] postgresql plugin: Added support for custom queries.

Sebastian Harl sh at tokkee.org
Fri Jul 18 21:37:23 CEST 2008


The user may now define and use custom queries to collect data. A query is
defined by specifying the SQL query to execute and a definition of the data
type of each result column:

  <Query magic>
    Query "SELECT magic FROM wizard;"
    Column gauge magic
  </Query>

The "Column" configuration option specifies the type name and optional type
instance: Column <type> [<type_instance>]. The number and order of the
"Column" option has to match the columns of the query result.

A query is activated by adding the configuration option "Query <name>" to the
appropriate "<Database>" configuration blocks. A query may be used multiple
times.

Signed-off-by: Sebastian Harl <sh at tokkee.org>
---
 src/postgresql.c |  263 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 261 insertions(+), 2 deletions(-)

diff --git a/src/postgresql.c b/src/postgresql.c
index e477348..6a270d8 100644
--- a/src/postgresql.c
+++ b/src/postgresql.c
@@ -73,10 +73,27 @@
 	port
 
 typedef struct {
+	char *type;
+	char *type_instance;
+	int   ds_type;
+} c_psql_col_t;
+
+typedef struct {
+	char *name;
+	char *query;
+
+	c_psql_col_t *cols;
+	int           cols_num;
+} c_psql_query_t;
+
+typedef struct {
 	PGconn      *conn;
 	c_complain_t conn_complaint;
 
 	/* user configuration */
+	c_psql_query_t **queries;
+	int              queries_num;
+
 	char *host;
 	char *port;
 	char *database;
@@ -90,9 +107,58 @@ typedef struct {
 	char *service;
 } c_psql_database_t;
 
+static c_psql_query_t *queries          = NULL;
+static int             queries_num      = 0;
+
 static c_psql_database_t *databases     = NULL;
 static int                databases_num = 0;
 
+static c_psql_query_t *c_psql_query_new (const char *name)
+{
+	c_psql_query_t *query;
+
+	++queries_num;
+	if (NULL == (queries = (c_psql_query_t *)realloc (queries,
+				queries_num * sizeof (*queries)))) {
+		log_err ("Out of memory.");
+		exit (5);
+	}
+	query = queries + queries_num - 1;
+
+	query->name  = sstrdup (name);
+	query->query = NULL;
+
+	query->cols     = NULL;
+	query->cols_num = 0;
+	return query;
+} /* c_psql_query_new */
+
+static void c_psql_query_delete (c_psql_query_t *query)
+{
+	int i;
+
+	sfree (query->name);
+	sfree (query->query);
+
+	for (i = 0; i < query->cols_num; ++i) {
+		sfree (query->cols[i].type);
+		sfree (query->cols[i].type_instance);
+	}
+	sfree (query->cols);
+	query->cols_num = 0;
+	return;
+} /* c_psql_query_delete */
+
+static c_psql_query_t *c_psql_query_get (const char *name)
+{
+	int i;
+
+	for (i = 0; i < queries_num; ++i)
+		if (0 == strcasecmp (name, queries[i].name))
+			return queries + i;
+	return NULL;
+} /* c_psql_query_get */
+
 static c_psql_database_t *c_psql_database_new (const char *name)
 {
 	c_psql_database_t *db;
@@ -111,6 +177,9 @@ static c_psql_database_t *c_psql_database_new (const char *name)
 	db->conn_complaint.last     = 0;
 	db->conn_complaint.interval = 0;
 
+	db->queries     = NULL;
+	db->queries_num = 0;
+
 	db->database   = sstrdup (name);
 	db->host       = NULL;
 	db->port       = NULL;
@@ -129,6 +198,9 @@ static void c_psql_database_delete (c_psql_database_t *db)
 {
 	PQfinish (db->conn);
 
+	sfree (db->queries);
+	db->queries_num = 0;
+
 	sfree (db->database);
 	sfree (db->host);
 	sfree (db->port);
@@ -224,6 +296,58 @@ static int c_psql_check_connection (c_psql_database_t *db)
 	return 0;
 } /* c_psql_check_connection */
 
+static int c_psql_exec_query (c_psql_database_t *db, int idx)
+{
+	c_psql_query_t *query;
+	PGresult       *res;
+
+	int rows, cols;
+	int i;
+
+	if (idx >= db->queries_num)
+		return -1;
+
+	query = db->queries[idx];
+
+	res = PQexec (db->conn, query->query);
+
+	if (PGRES_TUPLES_OK != PQresultStatus (res)) {
+		log_err ("Failed to execute SQL query: %s",
+				PQerrorMessage (db->conn));
+		log_info ("SQL query was: %s", query->query);
+		PQclear (res);
+		return -1;
+	}
+
+	rows = PQntuples (res);
+	if (1 > rows)
+		return 0;
+
+	cols = PQnfields (res);
+	if (query->cols_num != cols) {
+		log_err ("SQL query returned wrong number of fields "
+				"(expected: %i, got: %i)", query->cols_num, cols);
+		log_info ("SQL query was: %s", query->query);
+		return -1;
+	}
+
+	for (i = 0; i < rows; ++i) {
+		int j;
+
+		for (j = 0; j < cols; ++j) {
+			c_psql_col_t col = query->cols[j];
+
+			char *value = PQgetvalue (res, i, j);
+
+			if (col.ds_type == DS_TYPE_COUNTER)
+				submit_counter (db, col.type, col.type_instance, value);
+			else if (col.ds_type == DS_TYPE_GAUGE)
+				submit_gauge (db, col.type, col.type_instance, value);
+		}
+	}
+	return 0;
+} /* c_psql_exec_query */
+
 static int c_psql_stat_database (c_psql_database_t *db)
 {
 	const char *const query =
@@ -368,6 +492,8 @@ static int c_psql_read (void)
 	for (i = 0; i < databases_num; ++i) {
 		c_psql_database_t *db = databases + i;
 
+		int j;
+
 		assert (NULL != db->database);
 
 		if (0 != c_psql_check_connection (db))
@@ -377,6 +503,9 @@ static int c_psql_read (void)
 		c_psql_stat_user_tables (db);
 		c_psql_statio_user_tables (db);
 
+		for (j = 0; j < db->queries_num; ++j)
+			c_psql_exec_query (db, j);
+
 		++success;
 	}
 
@@ -402,6 +531,14 @@ static int c_psql_shutdown (void)
 
 	sfree (databases);
 	databases_num = 0;
+
+	for (i = 0; i < queries_num; ++i) {
+		c_psql_query_t *query = queries + i;
+		c_psql_query_delete (query);
+	}
+
+	sfree (queries);
+	queries_num = 0;
 	return 0;
 } /* c_psql_shutdown */
 
@@ -412,6 +549,33 @@ static int c_psql_init (void)
 	if ((NULL == databases) || (0 == databases_num))
 		return 0;
 
+	for (i = 0; i < queries_num; ++i) {
+		c_psql_query_t *query = queries + i;
+		int j;
+
+		for (j = 0; j < query->cols_num; ++j) {
+			c_psql_col_t     *col = query->cols + j;
+			const data_set_t *ds;
+
+			ds = plugin_get_ds (col->type);
+			if (NULL == ds) {
+				log_err ("Column: Unknown type \"%s\".", col->type);
+				c_psql_shutdown ();
+				return -1;
+			}
+
+			if (1 != ds->ds_num) {
+				log_err ("Column: Invalid type \"%s\" - types defining "
+						"one data source are supported only (got: %i).",
+						col->type, ds->ds_num);
+				c_psql_shutdown ();
+				return -1;
+			}
+
+			col->ds_type = ds->ds[0].type;
+		}
+	}
+
 	for (i = 0; i < databases_num; ++i) {
 		c_psql_database_t *db = databases + i;
 
@@ -447,7 +611,7 @@ static int c_psql_init (void)
 				"at server %s%s%s (server version: %d.%d.%d, "
 				"protocol version: %d, pid: %d)",
 				PQdb (db->conn), PQuser (db->conn),
-				C_PSQL_SOCKET3(server_host, PQport (db->conn)),
+				C_PSQL_SOCKET3 (server_host, PQport (db->conn)),
 				C_PSQL_SERVER_VERSION3 (server_version),
 				PQprotocolVersion (db->conn), PQbackendPID (db->conn));
 	}
@@ -470,6 +634,97 @@ static int config_set (char *name, char **var, const oconfig_item_t *ci)
 	return 0;
 } /* config_set */
 
+static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci)
+{
+	c_psql_col_t *col;
+
+	int i;
+
+	if ((0 != ci->children_num)
+			|| (1 > ci->values_num) || (2 < ci->values_num)) {
+		log_err ("Column expects either one or two arguments.");
+		return 1;
+	}
+
+	for (i = 0; i < ci->values_num; ++i) {
+		if (OCONFIG_TYPE_STRING != ci->values[i].type) {
+			log_err ("Column expects either one or two string arguments.");
+			return 1;
+		}
+	}
+
+	++query->cols_num;
+	if (NULL == (query->cols = (c_psql_col_t *)realloc (query->cols,
+				query->cols_num * sizeof (*query->cols)))) {
+		log_err ("Out of memory.");
+		exit (5);
+	}
+
+	col = query->cols + query->cols_num - 1;
+
+	col->ds_type = -1;
+
+	col->type = sstrdup (ci->values[0].value.string);
+	col->type_instance = (2 == ci->values_num)
+		? sstrdup (ci->values[1].value.string) : NULL;
+	return 0;
+} /* config_set_column */
+
+static int config_set_query (c_psql_database_t *db, const oconfig_item_t *ci)
+{
+	c_psql_query_t *query;
+
+	if ((0 != ci->children_num) || (1 != ci->values_num)
+			|| (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+		log_err ("Query expects a single string argument.");
+		return 1;
+	}
+
+	query = c_psql_query_get (ci->values[0].value.string);
+	if (NULL == query) {
+		log_err ("Query \"%s\" not found - please check your configuration.",
+				ci->values[0].value.string);
+		return 1;
+	}
+
+	++db->queries_num;
+	if (NULL == (db->queries = (c_psql_query_t **)realloc (db->queries,
+				db->queries_num * sizeof (*db->queries)))) {
+		log_err ("Out of memory.");
+		exit (5);
+	}
+
+	db->queries[db->queries_num - 1] = query;
+	return 0;
+} /* config_set_query */
+
+static int c_psql_config_query (oconfig_item_t *ci)
+{
+	c_psql_query_t *query;
+
+	int i;
+
+	if ((1 != ci->values_num)
+			|| (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+		log_err ("<Query> expects a single string argument.");
+		return 1;
+	}
+
+	query = c_psql_query_new (ci->values[0].value.string);
+
+	for (i = 0; i < ci->children_num; ++i) {
+		oconfig_item_t *c = ci->children + i;
+
+		if (0 == strcasecmp (c->key, "Query"))
+			config_set ("Query", &query->query, c);
+		else if (0 == strcasecmp (c->key, "Column"))
+			config_set_column (query, c);
+		else
+			log_warn ("Ignoring unknown config key \"%s\".", c->key);
+	}
+	return 0;
+} /* c_psql_config_query */
+
 static int c_psql_config_database (oconfig_item_t *ci)
 {
 	c_psql_database_t *db;
@@ -501,6 +756,8 @@ static int c_psql_config_database (oconfig_item_t *ci)
 			config_set ("KRBSrvName", &db->krbsrvname, c);
 		else if (0 == strcasecmp (c->key, "Service"))
 			config_set ("Service", &db->service, c);
+		else if (0 == strcasecmp (c->key, "Query"))
+			config_set_query (db, c);
 		else
 			log_warn ("Ignoring unknown config key \"%s\".", c->key);
 	}
@@ -514,7 +771,9 @@ static int c_psql_config (oconfig_item_t *ci)
 	for (i = 0; i < ci->children_num; ++i) {
 		oconfig_item_t *c = ci->children + i;
 
-		if (0 == strcasecmp (c->key, "Database"))
+		if (0 == strcasecmp (c->key, "Query"))
+			c_psql_config_query (c);
+		else if (0 == strcasecmp (c->key, "Database"))
 			c_psql_config_database (c);
 		else
 			log_warn ("Ignoring unknown config key \"%s\".", c->key);
-- 
1.5.6.1.156.ge903b

-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
Url : http://mailman.verplant.org/pipermail/collectd/attachments/20080718/c5144bf5/attachment-0001.pgp 


More information about the collectd mailing list