[collectd] [PATCH] email plugin: Simplified code.

Sebastian Harl sh at tokkee.org
Sun Apr 27 20:58:44 CEST 2008

While looking at the code for some reason, I decided to simplify and
improve large parts of it. Most notably, standard IO streams are now used
to read from the socket. This allowed to remove large parts of the code
which were used to read and buffer data from the socket so far.

Also among the changes:
 * free any allocated memory
 * added / improved log messages
 * do not require euid == 0 to chown() the socket

Signed-off-by: Sebastian Harl <sh at tokkee.org>
 src/email.c |  365 ++++++++++++++++++++++-------------------------------------
 1 files changed, 135 insertions(+), 230 deletions(-)

diff --git a/src/email.c b/src/email.c
index 869b7c3..87daed1 100644
--- a/src/email.c
+++ b/src/email.c
@@ -1,6 +1,6 @@
  * collectd - src/email.c
- * Copyright (C) 2006,2007  Sebastian Harl
+ * Copyright (C) 2006-2008  Sebastian Harl
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -58,17 +58,13 @@
 #	include <grp.h>
 #endif /* HAVE_GRP_H */
-#define MODULE_NAME "email"
-/* 256 bytes ought to be enough for anybody ;-) */
-#define BUFSIZE 256
 #define MAX_CONNS 5
 #define MAX_CONNS_LIMIT 16384
-#define log_err(...) ERROR (MODULE_NAME": "__VA_ARGS__)
-#define log_warn(...) WARNING (MODULE_NAME": "__VA_ARGS__)
+#define log_debug(...) DEBUG ("email: "__VA_ARGS__)
+#define log_err(...) ERROR ("email: "__VA_ARGS__)
+#define log_warn(...) WARNING ("email: "__VA_ARGS__)
  * Private data structures
@@ -90,19 +86,15 @@ typedef struct collector {
 	pthread_t thread;
 	/* socket descriptor of the current/last connection */
-	int socket;
+	FILE *socket;
 } collector_t;
 /* linked list of pending connections */
 typedef struct conn {
 	/* socket to read data from */
-	int socket;
-	/* buffer to read data to */
-	char *buffer;
-	int  idx; /* current write position in buffer */
-	int  length; /* length of the current line, i.e. index of '\0' */
+	FILE *socket;
+	/* linked list of connections */
 	struct conn *next;
 } conn_t;
@@ -125,8 +117,8 @@ static const char *config_keys[] =
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 /* socket configuration */
-static char *sock_file  = SOCK_PATH;
-static char *sock_group = COLLECTD_GRP_NAME;
+static char *sock_file  = NULL;
+static char *sock_group = NULL;
 static int  sock_perms  = S_IRWXU | S_IRWXG;
 static int  max_conns   = MAX_CONNS;
@@ -154,17 +146,20 @@ static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER;
 static int available_collectors;
 static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
-static type_list_t count;
+static type_list_t list_count;
+static type_list_t list_count_copy;
 static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER;
-static type_list_t size;
+static type_list_t list_size;
+static type_list_t list_size_copy;
 static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER;
 static double score;
 static int score_count;
 static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER;
-static type_list_t check;
+static type_list_t list_check;
+static type_list_t list_check_copy;
  * Private functions
@@ -172,9 +167,13 @@ static type_list_t check;
 static int email_config (const char *key, const char *value)
 	if (0 == strcasecmp (key, "SocketFile")) {
+		if (NULL != sock_file)
+			free (sock_file);
 		sock_file = sstrdup (value);
 	else if (0 == strcasecmp (key, "SocketGroup")) {
+		if (NULL != sock_group)
+			free (sock_group);
 		sock_group = sstrdup (value);
 	else if (0 == strcasecmp (key, "SocketPerms")) {
@@ -241,139 +240,10 @@ static void type_list_incr (type_list_t *list, char *name, int incr)
 } /* static void type_list_incr (type_list_t *, char *) */
-/* Read a single character from the socket. If an error occurs or end-of-file
- * is reached return '\0'. */
-static char read_char (conn_t *src)
-	char ret = '\0';
-	fd_set fdset;
-	FD_ZERO (&fdset);
-	FD_SET (src->socket, &fdset);
-	if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) {
-		char errbuf[1024];
-		log_err ("select() failed: %s",
-				sstrerror (errno, errbuf, sizeof (errbuf)));
-		return '\0';
-	}
-	assert (FD_ISSET (src->socket, &fdset));
-	do {
-		ssize_t len = 0;
-		errno = 0;
-		if (0 > (len = read (src->socket, (void *)&ret, 1))) {
-			if (EINTR != errno) {
-				char errbuf[1024];
-				log_err ("read() failed: %s",
-						sstrerror (errno, errbuf, sizeof (errbuf)));
-				return '\0';
-			}
-		}
-		if (0 == len)
-			return '\0';
-	} while (EINTR == errno);
-	return ret;
-} /* static char read_char (conn_t *) */
-/* Read a single line (terminated by '\n') from the the socket.
- *
- * The return value is zero terminated and does not contain any newline
- * characters.
- *
- * If an error occurs or end-of-file is reached return NULL.
- *
- * IMPORTANT NOTE: If there is no newline character found in BUFSIZE
- * characters of the input stream, the line will will be ignored! By
- * definition we should not get any longer input lines, thus this is
- * acceptable in this case ;-) */
-static char *read_line (conn_t *src)
-	int i = 0;
-	assert ((BUFSIZE >= src->idx) && (src->idx >= 0));
-	assert ((src->idx > src->length) || (src->length == 0));
-	if (src->length > 0) { /* remove old line */
-		src->idx -= (src->length + 1);
-		memmove (src->buffer, src->buffer + src->length + 1, src->idx);
-		src->length = 0;
-	}
-	for (i = 0; i < src->idx; ++i) {
-		if ('\n' == src->buffer[i])
-			break;
-	}
-	if (i == src->idx) {
-		fd_set fdset;
-		ssize_t len = 0;
-		FD_ZERO (&fdset);
-		FD_SET (src->socket, &fdset);
-		if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) {
-			char errbuf[1024];
-			log_err ("select() failed: %s",
-					sstrerror (errno, errbuf, sizeof (errbuf)));
-			return NULL;
-		}
-		assert (FD_ISSET (src->socket, &fdset));
-		do {
-			errno = 0;
-			if (0 > (len = read (src->socket,
-							(void *)(src->buffer + src->idx),
-							BUFSIZE - src->idx))) {
-				if (EINTR != errno) {
-					char errbuf[1024];
-					log_err ("read() failed: %s",
-							sstrerror (errno, errbuf, sizeof (errbuf)));
-					return NULL;
-				}
-			}
-			if (0 == len)
-				return NULL;
-		} while (EINTR == errno);
-		src->idx += len;
-		for (i = src->idx - len; i < src->idx; ++i) {
-			if ('\n' == src->buffer[i])
-				break;
-		}
-		if (i == src->idx) {
-			src->length = 0;
-			if (BUFSIZE == src->idx) { /* no space left in buffer */
-				while ('\n' != read_char (src))
-					/* ignore complete line */;
-				src->idx = 0;
-			}
-			return read_line (src);
-		}
-	}
-	src->buffer[i] = '\0';
-	src->length    = i;
-	return src->buffer;
-} /* static char *read_line (conn_t *) */
 static void *collect (void *arg)
 	collector_t *this = (collector_t *)arg;
-	char *buffer = (char *)smalloc (BUFSIZE);
+	pthread_t    self = pthread_self ();
 	while (1) {
 		int loop = 1;
@@ -393,44 +263,51 @@ static void *collect (void *arg)
 			conns.tail = NULL;
-		this->socket = connection->socket;
 		pthread_mutex_unlock (&conns_mutex);
-		connection->buffer = buffer;
-		connection->idx    = 0;
-		connection->length = 0;
+		/* make the socket available to the global
+		 * thread and connection management */
+		this->socket = connection->socket;
-		{ /* put the socket in non-blocking mode */
-			int flags = 0;
+		log_debug ("[thread #%5lu] handling connection on fd #%i",
+				self, fileno (this->socket));
-			errno = 0;
-			if (-1 == fcntl (connection->socket, F_GETFL, &flags)) {
-				char errbuf[1024];
-				log_err ("fcntl() failed: %s",
-						sstrerror (errno, errbuf, sizeof (errbuf)));
-				loop = 0;
-			}
+		while (loop) {
+			/* 256 bytes ought to be enough for anybody ;-) */
+			char line[256 + 1]; /* line + '\0' */
+			int  len = 0;
 			errno = 0;
-			if (-1 == fcntl (connection->socket, F_SETFL, flags | O_NONBLOCK)) {
-				char errbuf[1024];
-				log_err ("fcntl() failed: %s",
-						sstrerror (errno, errbuf, sizeof (errbuf)));
+			if (NULL == fgets (line, sizeof (line), this->socket)) {
 				loop = 0;
+				if (0 != errno) {
+					char errbuf[1024];
+					log_err ("[thread #%5lu] reading from socket (fd #%i) "
+							"failed: %s", self, fileno (this->socket),
+							sstrerror (errno, errbuf, sizeof (errbuf)));
+				}
+				break;
-		}
-		while (loop) {
-			char *line = read_line (connection);
+			len = strlen (line);
+			if (('\n' != line[len - 1]) && ('\r' != line[len - 1])) {
+				log_warn ("[thread #%5lu] line too long (> %i characters): "
+						"'%s' (truncated)", self, sizeof (line) - 1, line);
-			if (NULL == line) {
-				loop = 0;
-				break;
+				while (NULL != fgets (line, sizeof (line), this->socket))
+					if (('\n' == line[len - 1]) || ('\r' == line[len - 1]))
+						break;
+				continue;
+			line[len - 1] = '\0';
+			log_debug ("[thread #%5lu] line = '%s'", self, line);
 			if (':' != line[1]) {
-				log_err ("syntax error in line '%s'", line);
+				log_err ("[thread #%5lu] syntax error in line '%s'",
+						self, line);
@@ -441,19 +318,20 @@ static void *collect (void *arg)
 				int  bytes = 0;
 				if (NULL == tmp) {
-					log_err ("syntax error in line '%s'", line);
+					log_err ("[thread #%5lu] syntax error in line '%s'",
+							self, line);
 				bytes = atoi (tmp);
 				pthread_mutex_lock (&count_mutex);
-				type_list_incr (&count, type, 1);
+				type_list_incr (&list_count, type, 1);
 				pthread_mutex_unlock (&count_mutex);
 				if (bytes > 0) {
 					pthread_mutex_lock (&size_mutex);
-					type_list_incr (&size, type, bytes);
+					type_list_incr (&list_size, type, bytes);
 					pthread_mutex_unlock (&size_mutex);
@@ -470,19 +348,22 @@ static void *collect (void *arg)
 				do {
 					pthread_mutex_lock (&check_mutex);
-					type_list_incr (&check, type, 1);
+					type_list_incr (&list_check, type, 1);
 					pthread_mutex_unlock (&check_mutex);
 				} while (NULL != (type = strtok_r (NULL, ",", &ptr)));
 			else {
-				log_err ("unknown type '%c'", line[0]);
+				log_err ("[thread #%5lu] unknown type '%c'", self, line[0]);
 		} /* while (loop) */
-		close (connection->socket);
+		log_debug ("[thread #%5lu] shutting down connection on fd #%i",
+				pthread_self (), fileno (this->socket));
+		fclose (connection->socket);
 		free (connection);
-		this->socket = -1;
+		this->socket = NULL;
 		pthread_mutex_lock (&available_mutex);
@@ -491,7 +372,6 @@ static void *collect (void *arg)
 		pthread_cond_signal (&collector_available);
 	} /* while (1) */
-	free (buffer);
 	pthread_exit ((void *)0);
 } /* static void *collect (void *) */
@@ -499,6 +379,9 @@ static void *open_connection (void *arg)
 	struct sockaddr_un addr;
+	char *path  = (NULL == sock_file) ? SOCK_PATH : sock_file;
+	char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group;
 	/* create UNIX socket */
 	errno = 0;
 	if (-1 == (connector_socket = socket (PF_UNIX, SOCK_STREAM, 0))) {
@@ -511,7 +394,7 @@ static void *open_connection (void *arg)
 	addr.sun_family = AF_UNIX;
-	strncpy (addr.sun_path, sock_file, (size_t)(UNIX_PATH_MAX - 1));
+	strncpy (addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1));
 	addr.sun_path[UNIX_PATH_MAX - 1] = '\0';
 	unlink (addr.sun_path);
@@ -521,7 +404,8 @@ static void *open_connection (void *arg)
 					+ strlen(addr.sun_path))) {
 		char errbuf[1024];
 		disabled = 1;
-		connector_socket = -1; /* TODO: close? */
+		close (connector_socket);
+		connector_socket = -1;
 		log_err ("bind() failed: %s",
 				sstrerror (errno, errbuf, sizeof (errbuf)));
 		pthread_exit ((void *)1);
@@ -531,13 +415,13 @@ static void *open_connection (void *arg)
 	if (-1 == listen (connector_socket, 5)) {
 		char errbuf[1024];
 		disabled = 1;
-		connector_socket = -1; /* TODO: close? */
+		close (connector_socket);
+		connector_socket = -1;
 		log_err ("listen() failed: %s",
 				sstrerror (errno, errbuf, sizeof (errbuf)));
 		pthread_exit ((void *)1);
-	if ((uid_t) 0 == geteuid ())
 		struct group sg;
 		struct group *grp;
@@ -545,36 +429,32 @@ static void *open_connection (void *arg)
 		int status;
 		grp = NULL;
-		status = getgrnam_r (sock_group, &sg, grbuf, sizeof (grbuf), &grp);
+		status = getgrnam_r (group, &sg, grbuf, sizeof (grbuf), &grp);
 		if (status != 0)
 			char errbuf[1024];
-			log_warn ("getgrnam_r (%s) failed: %s", sock_group,
+			log_warn ("getgrnam_r (%s) failed: %s", group,
 					sstrerror (errno, errbuf, sizeof (errbuf)));
 		else if (grp == NULL)
-			log_warn ("No such group: `%s'", sock_group);
+			log_warn ("No such group: `%s'", group);
-			status = chown (sock_file, (uid_t) -1, grp->gr_gid);
+			status = chown (path, (uid_t) -1, grp->gr_gid);
 			if (status != 0)
 				char errbuf[1024];
 				log_warn ("chown (%s, -1, %i) failed: %s",
-						sock_file, (int) grp->gr_gid,
+						path, (int) grp->gr_gid,
 						sstrerror (errno, errbuf, sizeof (errbuf)));
-	else /* geteuid != 0 */
-	{
-		log_warn ("not running as root");
-	}
 	errno = 0;
-	if (0 != chmod (sock_file, sock_perms)) {
+	if (0 != chmod (path, sock_perms)) {
 		char errbuf[1024];
 		log_warn ("chmod() failed: %s",
 				sstrerror (errno, errbuf, sizeof (errbuf)));
@@ -599,7 +479,7 @@ static void *open_connection (void *arg)
 		for (i = 0; i < max_conns; ++i) {
 			collectors[i] = (collector_t *)smalloc (sizeof (collector_t));
-			collectors[i]->socket = -1;
+			collectors[i]->socket = NULL;
 			if (0 != (err = pthread_create (&collectors[i]->thread, &ptattr,
 							collect, collectors[i]))) {
@@ -634,7 +514,8 @@ static void *open_connection (void *arg)
 				if (EINTR != errno) {
 					char errbuf[1024];
 					disabled = 1;
-					connector_socket = -1; /* TODO: close? */
+					close (connector_socket);
+					connector_socket = -1;
 					log_err ("accept() failed: %s",
 							sstrerror (errno, errbuf, sizeof (errbuf)));
 					pthread_exit ((void *)1);
@@ -644,9 +525,14 @@ static void *open_connection (void *arg)
 		connection = (conn_t *)smalloc (sizeof (conn_t));
-		connection->socket = remote;
+		connection->socket = fdopen (remote, "r");
 		connection->next   = NULL;
+		if (NULL == connection->socket) {
+			close (remote);
+			continue;
+		}
 		pthread_mutex_lock (&conns_mutex);
 		if (NULL == conns.head) {
@@ -683,6 +569,8 @@ static int email_init (void)
 static int email_shutdown (void)
+	type_t *ptr = NULL;
 	int i = 0;
 	if (connector != ((pthread_t) 0)) {
@@ -698,6 +586,8 @@ static int email_shutdown (void)
 	/* don't allow any more connections to be processed */
 	pthread_mutex_lock (&conns_mutex);
+	available_collectors = 0;
 	if (collectors != NULL) {
 		for (i = 0; i < max_conns; ++i) {
 			if (collectors[i] == NULL)
@@ -708,18 +598,52 @@ static int email_shutdown (void)
 				collectors[i]->thread = (pthread_t) 0;
-			if (collectors[i]->socket >= 0) {
-				close (collectors[i]->socket);
-				collectors[i]->socket = -1;
+			if (collectors[i]->socket != NULL) {
+				fclose (collectors[i]->socket);
+				collectors[i]->socket = NULL;
+			sfree (collectors[i]);
+		sfree (collectors);
 	} /* if (collectors != NULL) */
 	pthread_mutex_unlock (&conns_mutex);
-	unlink (sock_file);
-	errno = 0;
+	for (ptr = list_count.head; NULL != ptr; ptr = ptr->next) {
+		free (ptr->name);
+		free (ptr);
+	}
+	for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) {
+		free (ptr->name);
+		free (ptr);
+	}
+	for (ptr = list_size.head; NULL != ptr; ptr = ptr->next) {
+		free (ptr->name);
+		free (ptr);
+	}
+	for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) {
+		free (ptr->name);
+		free (ptr);
+	}
+	for (ptr = list_check.head; NULL != ptr; ptr = ptr->next) {
+		free (ptr->name);
+		free (ptr);
+	}
+	for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next) {
+		free (ptr->name);
+		free (ptr);
+	}
+	unlink ((NULL == sock_file) ? SOCK_PATH : sock_file);
+	sfree (sock_file);
+	sfree (sock_group);
 	return (0);
 } /* static void email_shutdown (void) */
@@ -785,47 +709,28 @@ static int email_read (void)
 	double score_old;
 	int score_count_old;
-	static type_list_t *cnt;
-	static type_list_t *sz;
-	static type_list_t *chk;
 	if (disabled)
 		return (-1);
-	if (NULL == cnt) {
-		cnt = (type_list_t *)smalloc (sizeof (type_list_t));
-		cnt->head = NULL;
-	}
-	if (NULL == sz) {
-		sz = (type_list_t *)smalloc (sizeof (type_list_t));
-		sz->head = NULL;
-	}
-	if (NULL == chk) {
-		chk = (type_list_t *)smalloc (sizeof (type_list_t));
-		chk->head = NULL;
-	}
 	/* email count */
 	pthread_mutex_lock (&count_mutex);
-	copy_type_list (&count, cnt);
+	copy_type_list (&list_count, &list_count_copy);
 	pthread_mutex_unlock (&count_mutex);
-	for (ptr = cnt->head; NULL != ptr; ptr = ptr->next) {
+	for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) {
 		email_submit ("email_count", ptr->name, ptr->value);
 	/* email size */
 	pthread_mutex_lock (&size_mutex);
-	copy_type_list (&size, sz);
+	copy_type_list (&list_size, &list_size_copy);
 	pthread_mutex_unlock (&size_mutex);
-	for (ptr = sz->head; NULL != ptr; ptr = ptr->next) {
+	for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) {
 		email_submit ("email_size", ptr->name, ptr->value);
@@ -845,11 +750,11 @@ static int email_read (void)
 	/* spam checks */
 	pthread_mutex_lock (&check_mutex);
-	copy_type_list (&check, chk);
+	copy_type_list (&list_check, &list_check_copy);
 	pthread_mutex_unlock (&check_mutex);
-	for (ptr = chk->head; NULL != ptr; ptr = ptr->next)
+	for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next)
 		email_submit ("spam_check", ptr->name, ptr->value);
 	return (0);

-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
Url : http://mailman.verplant.org/pipermail/collectd/attachments/20080427/712ccdff/attachment-0001.pgp 

More information about the collectd mailing list