[collectd] [PATCH] postgresql plugin: Added support for custom queries.
Sebastian Harl
sh at tokkee.org
Fri Jul 18 21:37:23 CEST 2008
The user may now define and use custom queries to collect data. A query is
defined by specifying the SQL query to execute and a definition of the data
type of each result column:
<Query magic>
Query "SELECT magic FROM wizard;"
Column gauge magic
</Query>
The "Column" configuration option specifies the type name and optional type
instance: Column <type> [<type_instance>]. The number and order of the
"Column" option has to match the columns of the query result.
A query is activated by adding the configuration option "Query <name>" to the
appropriate "<Database>" configuration blocks. A query may be used multiple
times.
Signed-off-by: Sebastian Harl <sh at tokkee.org>
---
src/postgresql.c | 263 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 261 insertions(+), 2 deletions(-)
diff --git a/src/postgresql.c b/src/postgresql.c
index e477348..6a270d8 100644
--- a/src/postgresql.c
+++ b/src/postgresql.c
@@ -73,10 +73,27 @@
port
typedef struct {
+ char *type;
+ char *type_instance;
+ int ds_type;
+} c_psql_col_t;
+
+typedef struct {
+ char *name;
+ char *query;
+
+ c_psql_col_t *cols;
+ int cols_num;
+} c_psql_query_t;
+
+typedef struct {
PGconn *conn;
c_complain_t conn_complaint;
/* user configuration */
+ c_psql_query_t **queries;
+ int queries_num;
+
char *host;
char *port;
char *database;
@@ -90,9 +107,58 @@ typedef struct {
char *service;
} c_psql_database_t;
+static c_psql_query_t *queries = NULL;
+static int queries_num = 0;
+
static c_psql_database_t *databases = NULL;
static int databases_num = 0;
+static c_psql_query_t *c_psql_query_new (const char *name)
+{
+ c_psql_query_t *query;
+
+ ++queries_num;
+ if (NULL == (queries = (c_psql_query_t *)realloc (queries,
+ queries_num * sizeof (*queries)))) {
+ log_err ("Out of memory.");
+ exit (5);
+ }
+ query = queries + queries_num - 1;
+
+ query->name = sstrdup (name);
+ query->query = NULL;
+
+ query->cols = NULL;
+ query->cols_num = 0;
+ return query;
+} /* c_psql_query_new */
+
+static void c_psql_query_delete (c_psql_query_t *query)
+{
+ int i;
+
+ sfree (query->name);
+ sfree (query->query);
+
+ for (i = 0; i < query->cols_num; ++i) {
+ sfree (query->cols[i].type);
+ sfree (query->cols[i].type_instance);
+ }
+ sfree (query->cols);
+ query->cols_num = 0;
+ return;
+} /* c_psql_query_delete */
+
+static c_psql_query_t *c_psql_query_get (const char *name)
+{
+ int i;
+
+ for (i = 0; i < queries_num; ++i)
+ if (0 == strcasecmp (name, queries[i].name))
+ return queries + i;
+ return NULL;
+} /* c_psql_query_get */
+
static c_psql_database_t *c_psql_database_new (const char *name)
{
c_psql_database_t *db;
@@ -111,6 +177,9 @@ static c_psql_database_t *c_psql_database_new (const char *name)
db->conn_complaint.last = 0;
db->conn_complaint.interval = 0;
+ db->queries = NULL;
+ db->queries_num = 0;
+
db->database = sstrdup (name);
db->host = NULL;
db->port = NULL;
@@ -129,6 +198,9 @@ static void c_psql_database_delete (c_psql_database_t *db)
{
PQfinish (db->conn);
+ sfree (db->queries);
+ db->queries_num = 0;
+
sfree (db->database);
sfree (db->host);
sfree (db->port);
@@ -224,6 +296,58 @@ static int c_psql_check_connection (c_psql_database_t *db)
return 0;
} /* c_psql_check_connection */
+static int c_psql_exec_query (c_psql_database_t *db, int idx)
+{
+ c_psql_query_t *query;
+ PGresult *res;
+
+ int rows, cols;
+ int i;
+
+ if (idx >= db->queries_num)
+ return -1;
+
+ query = db->queries[idx];
+
+ res = PQexec (db->conn, query->query);
+
+ if (PGRES_TUPLES_OK != PQresultStatus (res)) {
+ log_err ("Failed to execute SQL query: %s",
+ PQerrorMessage (db->conn));
+ log_info ("SQL query was: %s", query->query);
+ PQclear (res);
+ return -1;
+ }
+
+ rows = PQntuples (res);
+ if (1 > rows)
+ return 0;
+
+ cols = PQnfields (res);
+ if (query->cols_num != cols) {
+ log_err ("SQL query returned wrong number of fields "
+ "(expected: %i, got: %i)", query->cols_num, cols);
+ log_info ("SQL query was: %s", query->query);
+ return -1;
+ }
+
+ for (i = 0; i < rows; ++i) {
+ int j;
+
+ for (j = 0; j < cols; ++j) {
+ c_psql_col_t col = query->cols[j];
+
+ char *value = PQgetvalue (res, i, j);
+
+ if (col.ds_type == DS_TYPE_COUNTER)
+ submit_counter (db, col.type, col.type_instance, value);
+ else if (col.ds_type == DS_TYPE_GAUGE)
+ submit_gauge (db, col.type, col.type_instance, value);
+ }
+ }
+ return 0;
+} /* c_psql_exec_query */
+
static int c_psql_stat_database (c_psql_database_t *db)
{
const char *const query =
@@ -368,6 +492,8 @@ static int c_psql_read (void)
for (i = 0; i < databases_num; ++i) {
c_psql_database_t *db = databases + i;
+ int j;
+
assert (NULL != db->database);
if (0 != c_psql_check_connection (db))
@@ -377,6 +503,9 @@ static int c_psql_read (void)
c_psql_stat_user_tables (db);
c_psql_statio_user_tables (db);
+ for (j = 0; j < db->queries_num; ++j)
+ c_psql_exec_query (db, j);
+
++success;
}
@@ -402,6 +531,14 @@ static int c_psql_shutdown (void)
sfree (databases);
databases_num = 0;
+
+ for (i = 0; i < queries_num; ++i) {
+ c_psql_query_t *query = queries + i;
+ c_psql_query_delete (query);
+ }
+
+ sfree (queries);
+ queries_num = 0;
return 0;
} /* c_psql_shutdown */
@@ -412,6 +549,33 @@ static int c_psql_init (void)
if ((NULL == databases) || (0 == databases_num))
return 0;
+ for (i = 0; i < queries_num; ++i) {
+ c_psql_query_t *query = queries + i;
+ int j;
+
+ for (j = 0; j < query->cols_num; ++j) {
+ c_psql_col_t *col = query->cols + j;
+ const data_set_t *ds;
+
+ ds = plugin_get_ds (col->type);
+ if (NULL == ds) {
+ log_err ("Column: Unknown type \"%s\".", col->type);
+ c_psql_shutdown ();
+ return -1;
+ }
+
+ if (1 != ds->ds_num) {
+ log_err ("Column: Invalid type \"%s\" - types defining "
+ "one data source are supported only (got: %i).",
+ col->type, ds->ds_num);
+ c_psql_shutdown ();
+ return -1;
+ }
+
+ col->ds_type = ds->ds[0].type;
+ }
+ }
+
for (i = 0; i < databases_num; ++i) {
c_psql_database_t *db = databases + i;
@@ -447,7 +611,7 @@ static int c_psql_init (void)
"at server %s%s%s (server version: %d.%d.%d, "
"protocol version: %d, pid: %d)",
PQdb (db->conn), PQuser (db->conn),
- C_PSQL_SOCKET3(server_host, PQport (db->conn)),
+ C_PSQL_SOCKET3 (server_host, PQport (db->conn)),
C_PSQL_SERVER_VERSION3 (server_version),
PQprotocolVersion (db->conn), PQbackendPID (db->conn));
}
@@ -470,6 +634,97 @@ static int config_set (char *name, char **var, const oconfig_item_t *ci)
return 0;
} /* config_set */
+static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci)
+{
+ c_psql_col_t *col;
+
+ int i;
+
+ if ((0 != ci->children_num)
+ || (1 > ci->values_num) || (2 < ci->values_num)) {
+ log_err ("Column expects either one or two arguments.");
+ return 1;
+ }
+
+ for (i = 0; i < ci->values_num; ++i) {
+ if (OCONFIG_TYPE_STRING != ci->values[i].type) {
+ log_err ("Column expects either one or two string arguments.");
+ return 1;
+ }
+ }
+
+ ++query->cols_num;
+ if (NULL == (query->cols = (c_psql_col_t *)realloc (query->cols,
+ query->cols_num * sizeof (*query->cols)))) {
+ log_err ("Out of memory.");
+ exit (5);
+ }
+
+ col = query->cols + query->cols_num - 1;
+
+ col->ds_type = -1;
+
+ col->type = sstrdup (ci->values[0].value.string);
+ col->type_instance = (2 == ci->values_num)
+ ? sstrdup (ci->values[1].value.string) : NULL;
+ return 0;
+} /* config_set_column */
+
+static int config_set_query (c_psql_database_t *db, const oconfig_item_t *ci)
+{
+ c_psql_query_t *query;
+
+ if ((0 != ci->children_num) || (1 != ci->values_num)
+ || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+ log_err ("Query expects a single string argument.");
+ return 1;
+ }
+
+ query = c_psql_query_get (ci->values[0].value.string);
+ if (NULL == query) {
+ log_err ("Query \"%s\" not found - please check your configuration.",
+ ci->values[0].value.string);
+ return 1;
+ }
+
+ ++db->queries_num;
+ if (NULL == (db->queries = (c_psql_query_t **)realloc (db->queries,
+ db->queries_num * sizeof (*db->queries)))) {
+ log_err ("Out of memory.");
+ exit (5);
+ }
+
+ db->queries[db->queries_num - 1] = query;
+ return 0;
+} /* config_set_query */
+
+static int c_psql_config_query (oconfig_item_t *ci)
+{
+ c_psql_query_t *query;
+
+ int i;
+
+ if ((1 != ci->values_num)
+ || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+ log_err ("<Query> expects a single string argument.");
+ return 1;
+ }
+
+ query = c_psql_query_new (ci->values[0].value.string);
+
+ for (i = 0; i < ci->children_num; ++i) {
+ oconfig_item_t *c = ci->children + i;
+
+ if (0 == strcasecmp (c->key, "Query"))
+ config_set ("Query", &query->query, c);
+ else if (0 == strcasecmp (c->key, "Column"))
+ config_set_column (query, c);
+ else
+ log_warn ("Ignoring unknown config key \"%s\".", c->key);
+ }
+ return 0;
+} /* c_psql_config_query */
+
static int c_psql_config_database (oconfig_item_t *ci)
{
c_psql_database_t *db;
@@ -501,6 +756,8 @@ static int c_psql_config_database (oconfig_item_t *ci)
config_set ("KRBSrvName", &db->krbsrvname, c);
else if (0 == strcasecmp (c->key, "Service"))
config_set ("Service", &db->service, c);
+ else if (0 == strcasecmp (c->key, "Query"))
+ config_set_query (db, c);
else
log_warn ("Ignoring unknown config key \"%s\".", c->key);
}
@@ -514,7 +771,9 @@ static int c_psql_config (oconfig_item_t *ci)
for (i = 0; i < ci->children_num; ++i) {
oconfig_item_t *c = ci->children + i;
- if (0 == strcasecmp (c->key, "Database"))
+ if (0 == strcasecmp (c->key, "Query"))
+ c_psql_config_query (c);
+ else if (0 == strcasecmp (c->key, "Database"))
c_psql_config_database (c);
else
log_warn ("Ignoring unknown config key \"%s\".", c->key);
--
1.5.6.1.156.ge903b
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
Url : http://mailman.verplant.org/pipermail/collectd/attachments/20080718/c5144bf5/attachment-0001.pgp
More information about the collectd
mailing list