Loading data from Vertica efficiently

Original Postby hdimitriou » Thu Jan 03, 2013 9:22 am

On my company we are interested in loading user profiles from Vertica to Aerospike. I have read that “…Aerospike works with our proprietary system and HP Vertica’s data warehouse…” and I am wondering if there is a special way to load data from Vertica to Aerospike. In my mind we could script quering data from Vertica into a file and loading them line by line to Aerospike using the ‘cli’ API, or we could write code in Java/C (using a thread for each write?) and do the same. What would you suggest in order to maximize performance?

Postby young » Thu Jan 03, 2013 5:16 pm

You can do it the way that you have specified and that will work. The only issue you may face is that a single process loading the data may take a lot of time. In general we recommend you to use multiple processes, perhaps spread over multiple hosts to load the data more quickly.

Aerospike customers that use Hadoop will normally use the already parallel structure of their Hadoop cluster to load the data efficiently from multiple Hadoop nodes. If this is not practical or feasible, you may simply want to export your data from your data warehouse into different files and then load them in parallel. Here is a link to an example csv loader with an associated README file:

young

Postby hdimitriou » Fri Jan 04, 2013 4:04 am

Thank you for your answer. The .zip file contains another csvloader.zip file that seems to be corrupted.

Postby hdimitriou » Fri Jan 04, 2013 8:49 am

Thank you for your answer. As a first test I implemented a Java class that reads rows from a file and assings each row to a thread - it seems to be working ‘ok’.

Your attachment contains another .zip file that looks corrupted, can you please reupload?

Postby hdimitriou » Fri Jan 04, 2013 9:25 am

I realized the cvsloader.zip you have inside is an executable, however it fails to work on Centos 5.8 release due to " error while loading shared libraries: libssl.so.10: cannot open shared object file: No such file or directory"

Is there any chance you provide me with a Centos 5 compliant version of it, or the source (I’d appreciate it a lot if you could share some code on this).

Postby young » Fri Jan 04, 2013 2:25 pm

Attached is a copy of the CSV Loader source code. This was written in C and requires the use of the Aerospike C library, but it should help in understanding how it works. Please send us any comments.

The full text of the c code can be found below.

/*
 *  Aerospike Foundation
 *  src/csvloader.c - Uploads data from a csv file.
 *
 * Copyright (c) 2008-2013 Aerospike, Inc. All rights reserved.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is furnished to do
 * so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */


#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <stdbool.h>
#include <dirent.h>
#include <fcntl.h>
#include <ctype.h>
#include <sys/stat.h>
#include <pthread.h>
#include "citrusleaf/cf_shash.h"
#include "citrusleaf/citrusleaf.h"
#include "csvloader.h"

// #define DEBUG 1
// #define DEBUG_VERBOSE 1


// Make sure you free it properly (maintain a copy of original pointer which malloc returned)
// if you allocated this space on heap
char * ltrim(char * s)
{
	while(isspace(*s)) {
		s++;
	}
	return s;
}

// Take care when you are using this function extensively
// This function blocks some unused space

char * rtrim(char * s)
{
	char * last = s + strlen(s);
	while(isspace(*--last));
	*(last+1) = '\0';
	return s;
}

char * trim(char * s)
{
	return rtrim( ltrim(s) );
}


/*
// CAVEAT 1: Do not dereference the return of this function (that may be a segmentation fault in 
// case this function just read the last character of this string) 

INPUT: 	It takes a string s, address of a integer variable, address of bool type variable empty and 
		address of bool type variable valid.

RETURNS: It returns pointer to next token location (Just after the delimiter ) and NULL after the 
		 reading the last token.

Its sets the valid as true if it the trimmed token contains a valid integer 
(i.e) matches this regex [space]*[-+](0,1)[digit]*[space]* else false.

It sets empty as false if the token contains only whitespaces or none else true.

*/
char *atoi_tok(char *s, int *val, bool *empty, bool *valid)
{
	*empty = true;
	*valid = true;
	
	//removing whitespaces..
	while( isspace(*s) ) {
		s++;
	}

	//checking for negative value
	bool is_neg = false;
	if (*s == '-') { 
		is_neg = true; 
		s++; 
	} else if (*s == '+') {
		s++;
	}

	//calculating value..
	uint i = 0;
	while (*s >= '0' && *s <= '9') {
		*empty = false;
		i *= 10;
		i += *s - '0';
		s++;
	}
	if (is_neg && !empty) {
		i = -i;
	}
	*val = i;
	//ignoring trailing whitespaces..
	while(isspace(*s)) {
		s++;
	}


	//cheching for delimiter..
	if (*s == ',') {
		return s+1;
	} 
	else if (*s == '\0') {
		return NULL;
	}

	//if the code comes here, that means the string is invalid..
	valid = false;
	while(*s != ',' && *s != '\0') {
		s++;
	}
	
	//Getting the location of next token...
	if (*s == ',') {
		*s = '\0';
		return s+1;
	} 
	else if ( *s == '\0') {
		return NULL;
	}
	return NULL;
}


/*
INPUT: address of a string and an address of a bool type variable

RETURNS: It returns pointer to next token location (Just after the delimiter ) and NULL after the 
reading the last token.

If the token is empty or just contains whitespaces, empty would be set as true else false.
The string s will contain the trimmed token (trimmed of whitespaces). 
*/
char *getstr_comma_tok(char **s, bool *empty)
{
	char *start;
	char * end;
	char *temp;

	start = *s;
	end = *s;
	temp = start;
	*empty = true;
	while(isspace(*temp) && *temp != '\n') {
		temp++;
	}

	start = temp;
	while ( *temp != ',' && *temp != '\0' && *temp != '\n') {	
		*empty=false;
		temp++;
		end = temp;
		while(isspace(*temp)) {
			temp++;
		}
	}
	*s = start;
	if (*temp == ',') {
		 *end = '\0';
		return( temp +1);
	} 
	else if (*temp == '\0' ) {
		*end = '\0';
		return NULL;
	}
	return NULL;
}

// Some history
// the backup program is writing bite-sized chunks - files about 16M in size
// so it's reasonable to read one in by the whole
// although it's probably better to make smaller reads and slide the buffer
// around (like 1M reads). Doing this as an MMAP might make sense, but the MMAP
// code has been non-performant in Linux for several years.
//
// This code allocates a single buffer and does one read (on each thread). If you
// attempt to 'do the right thing' and read smaller chunks, you will get bit by
// some kind of more subtle bug that I haven't found yet. So you'll see a whole
// loop down there that's never used.
//

int
load(char *filename, FILE *fd, config *c)
{
	int rv = 0;

#ifdef DEBUG	
	fprintf(stderr, "starting filename: %s FILE %p timeout %d\n",filename,fd,c->timeout);
#endif	
	
	size_t len = 10000;
	char line[len];
	
	if(fd == NULL ) {
		fprintf(stderr, "Error: Opening file %s failed with error = %s\n",
		filename, strerror(errno));
		exit (1);
	}

	//Parsing the format file..

	char fm_file[500];
	
	sprintf(fm_file, "%s", c->format);

	FILE * fmt = NULL; 
	fmt = fopen(fm_file, "r");

	if (fmt == NULL) {
		fprintf(stderr, "Opening file %s failed with error = %s\n",fm_file, strerror(errno));
		exit (1);
	}

	fgets(line, len, fmt);

	int llen = strlen(line);	
	char * tmp;

	// replacing the end of line char
	if (line[llen-2] == '\r') {// Windows machine
	// windows generated file sometimes contain \r at n-2th position 
		line[llen-2]='\0';
		//llen -= 2;
	} else if (line[llen-1] == '\n') { // General machine (usually linux)
		line[llen-1]='\0';
		//llen--;
	}

	// Counting number of objects ( key + bins)
	// This function should be changed to include commas in data
	int num_obj =0;  	//Number of total objects (bins + key)
	tmp = strtok(line, ",");
	while (tmp != NULL)
	{
        	num_obj++;
        	tmp = strtok(NULL, ",");
	}
	
  	fgets(line, len, fmt);
	llen = strlen(line);

	// replacing the end of line char
	if (line[llen-2] == '\r') {// Windows machine
	// windows generated file sometimes contain \r at n-2th position 
		line[llen-2]='\0';
		//llen -= 2;
	} else if (line[llen-1] == '\n') { // General machine (usually linux)
		line[llen-1]='\0';
		//llen--;
	}

	//Error case : When first line is not there in the format file.
	if(num_obj == 0) {
                fprintf(stderr, "Error: Old bin names are not specified in the format.\n");
                exit(1);
        }


	//Getting the name of bins .. 	
	int key_index=-1; 	//contains the index of key in the csv file..
	bool key_appeared = 0; 	// use for marking the appearance of key. 
				//Also used for error handling.
	int index = 0;
	cl_bin bins_buf[num_obj];

	tmp = strtok(line, ",");
	// Second condition is not redundant, it is to avoid seg fault for corrupt data
	while(tmp != NULL && index < num_obj)
    	{
		tmp = trim(tmp);
		//take care of case when 2 key comes in a record...
		if(strcmp(tmp, "key") == 0) {
			if(key_appeared == 1) {
				//Error case: When more then one key is found in the format file.
				fprintf(stderr, "Error: Composite keys are not supported.\n");
				exit(1);
			}
			key_index = index;
			key_appeared = 1;
		} else if(tmp!= NULL){
			strcpy(bins_buf[index-key_appeared].bin_name, tmp);
        	} else {
			//Error case: when null/whitespaces values are found as bin name.
			fprintf(stderr, "Error: NULL values in the format file.\n");
			exit(1);
		}
		tmp = strtok(NULL, ",");
        	index++;
	}

	//Error case: Second line missing in the format..
	if(index == 0) {
                fprintf(stderr, "Error: Keys and bin names are not specified in the expected format.\n");
                exit(1);
        }
	
	//Revising number of objects.
	if(tmp==NULL) {
		num_obj = index;
	}

	//Error case: no "key" word is found in the third line.
	if(key_index == -1) {
		fprintf(stderr, "Error: No key found in the format.\n");
		exit(1);
	}

	fgets(line, len, fmt);
	llen = strlen(line);

	// replacing the end of line char
	if (line[llen-2] == '\r') {// Windows machine
	// windows generated file sometimes contain \r at n-2th position 
		line[llen-2]='\0';
		//llen -= 2;
	} else if (line[llen-1] == '\n') { // General machine (usually linux)
		line[llen-1]='\0';
		//llen--;
	}

	
	index =0;
	key_appeared = 0;



	//Getting types of bins... 
	char * key_type = NULL;	// type of key..
	tmp = strtok(line, ",");

	// Second condition is not redundant, it is to avoid seg fault for corrupt data
	while(tmp != NULL && index < num_obj )	
	{
		tmp = trim(tmp);
		if (index == key_index) {
			key_appeared = 1;
			key_type = tmp;
			if (strncmp(tmp, "int", 3) !=0 && strncmp(tmp,"str", 3) != 0 ) {
				fprintf(stderr, "Error: Ambigious type for key.\n");
				exit(1);
			}
		} 
		else {
        		if (strncmp(tmp, "int", 3) == 0) {
           	    		bins_buf[index-key_appeared].object.type = CL_INT;
        		} 
			else if (strncmp(tmp,"str", 3) == 0) {
        	    		bins_buf[index-key_appeared].object.type = CL_STR;
        		}
			else {
				fprintf(stderr, "Error: Ambigious bin type in the format file.\n");
				exit(1);
			}
		}
		index++;
		tmp = strtok(NULL, ",");
	}

	if(index != num_obj && tmp == NULL) {
		fprintf(stderr, "Error: Some bins/key are not assigned any data-type.\n");
		exit(1);
	}
	//Parsing of format file is done here..
///////////////////////////////////////////////

	cl_bin *bins = 0, *bin = 0;
	
	// find out the size of the file
	fseek(fd,0,SEEK_END);
	off_t READ_SIZE = ftell(fd);
	fseek(fd,0,SEEK_SET);
    
	// allocate a large block..
	uint8_t *buf = malloc(READ_SIZE);
	if (!buf) {
		fprintf(stderr, "out of memory reading a buffer for disk, aborting");
		return(-1);
	}
	
	// buffer offset
	size_t off = 0;
	// for prints, file offset
	size_t file_off = 0;
	
	do {

#ifdef DEBUG		
		fprintf(stderr, "read a chunk of %s file: offset %zu fileoffset %zu\n",filename,off,file_off);
#endif	

		// read a block 	
		size_t s = fread(buf + off, 1, READ_SIZE - off, fd);
		if (s <= 0) {
#ifdef DEBUG			
			fprintf(stderr, "done! %s s %zd\n",filename,s);
#endif	
			goto Cleanup;
		}

		file_off += (READ_SIZE - off);
		
		char *lim = (char *) (buf + (s + off));
		char *r_s = (char *) buf; // record start
		char *l_s; // line start
		char *l_e = 0; // line end
		do {
			char *ns;
			char *set;
			ns  = c->ns;
			set = c->set;
			l_s = r_s;
			bool skip = 0;  // to keep track of corrupt record..

#ifdef DEBUG_VERBOSE
			fprintf(stderr, "next line: parse header: offset %zd\n",l_s - buf);
#endif			
	
			//parsing the .csv file.. 
			l_e = l_s;
			while (*l_e++ != '\n') {
				if (l_e == lim) goto BufEnd;
			}
			//Replacing the last character of line with \0
			*(l_e-1) = 0;


			//assuming no commas in the value..bad assumption will change it in second go.. 
			cl_object key_o;
			int curr_obj_size = 0;
			cl_bin curr_bins[num_obj];
			char * str_value;
			int int_value;
			key_appeared=0;
			for (int i=0; i<num_obj && l_s != NULL; i++) {
				
				bool valid=true;
				bool empty=true;
				
				if(i == key_index) {
					key_appeared = 1;
					
					if(strncmp(key_type,"str", 3) == 0) {
						str_value = l_s;
						l_s = getstr_comma_tok(&str_value, &empty);  // will return the next char address  if it founds a null value..
						
						if (empty == true) {
							fprintf(stderr, "Key value not found in some record.\n");
							skip =1;
							break;
						} else {
							citrusleaf_object_init_str(&key_o, str_value);
						}
	
					} else if (strncmp(key_type,"int", 3) == 0){
						l_s = atoi_tok(l_s, &int_value, &empty, &valid); 

						if(valid == false){
							skip = 1;
							break;
						}
						else if (empty == true){
							fprintf(stderr, "Key value not found in some record.\n");
							skip = 1;
							break;
						} 
						else {
							citrusleaf_object_init_int(&key_o, int_value);
							fprintf(stderr, "key.. %d\n",int_value );
						}

					}

				 } else {

					if(bins_buf[i-key_appeared].object.type == CL_STR) {
						str_value = l_s;
						l_s = getstr_comma_tok(&str_value, &empty);   //get the value
						
						if(empty == false) {
							curr_bins[curr_obj_size] = bins_buf[i-key_appeared];
							citrusleaf_object_init_str(&(curr_bins[curr_obj_size].object), str_value);    //attaching the object..
							curr_obj_size++;
						}
	
					} 
					else if(bins_buf[i-key_appeared].object.type == CL_INT) {
						l_s = atoi_tok(l_s, &int_value, &empty, &valid);

						if(valid == false) {
							fprintf(stderr, "Ambigous data. Expecting integer instead of string.\n");
							skip =  1;
							break;
						} else if(empty == false ) {
							curr_bins[curr_obj_size] = bins_buf[i-key_appeared];
							citrusleaf_object_init_int(&(curr_bins[curr_obj_size].object), int_value);
							curr_obj_size++;
						}

					}
				}

			}
			if (key_appeared == 0) {
				fprintf(stderr, "Error: No key found for the record in the data file.\n");
				skip =1;
			}
			l_s = l_e;

			if(skip == 1) {
				r_s = l_s;
				continue;
			}
#ifdef DEBUG_VERBOSE
			fprintf(stderr, "load test: namespace %s  digest %"PRIx64"\n",ns,*(uint64_t *) &digest);
#endif			

			bins = bin = &bins_buf[0];
			cl_rv rv;
			
			// compared with the real world
			//atomic_int_add(c->notfound_counter, 1);
			// if (c->verbose) fprintf(stderr, "get failed in load: NOTFOUND\n");
			cl_write_parameters cl_w_p;
			cl_write_parameters_set_default(&cl_w_p);
			rv = citrusleaf_put(c->asc, ns, set, &key_o, curr_bins, curr_obj_size, &cl_w_p);
			if (rv != CITRUSLEAF_OK) {
				fprintf(stderr, "get failed in load: rv %d\n",rv);
				if(rv == 4) {
					fprintf(stderr, "Namespace mismatch\n");
				}
				atomic_int_add(c->fail_counter, 1);
			}
	
			atomic_int_add(c->records_counter, 1);
				
			bins = 0;
			
			r_s = l_s;
		} while(r_s < lim);

BufEnd:
		// take the partial and the end and slide it to the front
		if (r_s < lim) {
			memcpy( buf, r_s, lim - r_s);
#ifdef DEBUG			
fprintf(stderr, "%s: end of buffer: moved %zu bytes from %p to %p\n",filename,lim - r_s, r_s, buf);
#endif			
		}
		else if (r_s == lim) {
#ifdef DEBUG			
fprintf(stderr, "%s: end of buffer: exact match in size: %p %p\n",filename, r_s, lim);
#endif			
		}
		else {
#ifdef DEBUG			
fprintf(stderr, "%s: end of buffer: record start is beyond limit: serious internal error: rs %p lim %p\n",filename, r_s, lim);
#endif						
		}
		off = lim - r_s;
	} while(1);
Cleanup:
	free(buf);	
	if (bins && (bins_buf != bins))	free(bins);
	
	return(rv);
}

typedef struct load_structure_s {
    struct load_structure_s *next;
    config *c;
    FILE *fd;
    char *filename; // debug prints like this
    pthread_t   th;
} load_struct;


void *
do_load(void *arg)
{
	load_struct *rs = (load_struct *) arg;

	rs->fd = fopen(rs->filename, "r");
	if (rs->fd) {
		load(rs->filename, rs->fd, rs->c);

		// close the file & get out of dodge
		fclose(rs->fd);
	}
	else {
		fprintf(stderr, "cannot open file %s\n", rs->filename);
	}
	return(0);
}



typedef struct {
	
	atomic_int		*records;
	atomic_int		*timeouts;
	atomic_int		*fails;
	atomic_int		*data_mismatches;
	atomic_int		*notfound;
	
	int				death;
	pthread_t		th;
} counter_thread_control;

void *
counter_fn(void *arg)
{
	counter_thread_control *ctc = (counter_thread_control *) arg;
	
	while (ctc->death == 0) {
		sleep(1);
//		fprintf(stderr, "load: records %"PRIu64" bytes %"PRIu64"\n",
//			atomic_int_get(ctc->records), atomic_int_get(ctc->bytes) );
		fprintf(stderr, "load: records %"PRIu64" timeouts %"PRIu64" fails %"PRIu64" wrongdata %"PRIu64" notfound %"PRIu64"\n",
			atomic_int_get(ctc->records), atomic_int_get(ctc->timeouts) , atomic_int_get(ctc->fails),
			atomic_int_get(ctc->data_mismatches) , atomic_int_get(ctc->notfound) );
	}
	return(0);
}

void *
start_counter_thread(atomic_int *records, atomic_int *timeouts, atomic_int *fails, atomic_int *data_mismatches, atomic_int *notfound)
{
	counter_thread_control *ctc = (counter_thread_control *) malloc(sizeof(counter_thread_control));
	ctc->records = records;
	ctc->timeouts = timeouts;
	ctc->fails = fails;
	ctc->data_mismatches = data_mismatches;
	ctc->notfound = notfound;
	ctc->death = 0;
	pthread_create(&ctc->th, 0, counter_fn, ctc);
	return(ctc);
}


void
stop_counter_thread(void *control)
{
	counter_thread_control *ctc = (counter_thread_control *)control;
	ctc->death = 1;
	pthread_join(ctc->th, 0);
	free(ctc);
}


void usage(void) {
	fprintf(stderr, "Usage key_c:\n");
	fprintf(stderr, "-h host [default 127.0.0.1] \n");
	fprintf(stderr, "-p port [default 3000]\n");
	fprintf(stderr, "-t threads [default 40]\n");
	fprintf(stderr, "-d directory [default cl_load]\n");
	fprintf(stderr, "-n namespace [its compulsory to specify this.]\n");
	fprintf(stderr, "-m milliseconds timeout [default 100]\n");
	fprintf(stderr, "-v is verbose\n");
	fprintf(stderr, "-s is set [default \"\"]\n");
	fprintf(stderr, "-f is format file. [default format.csv]\n");
}

load_struct *
pop_tail(load_struct **list)
{
    if ((list == 0) || (*list == 0)) return(0);
    load_struct *e = *list;

    // find the tail    
    load_struct *prev = 0;
    while (e->next ) {
        prev = e;
        e = e->next;
    }

    // fixup list
    if (prev)
        prev->next = 0;
    else
        *list = 0;

    return(e);
}


int
main(int argc, char **argv)
{
	config c;
	memset(&c, 0, sizeof(c));
	
	c.host = "127.0.0.1";
	c.port = 3000;

	c.ns = "";
	c.set = "";
	c.verbose = false;
	c.directory = "cl_load";
	c.n_threads = 40;
	c.timeout = 100;
	c.format = "format.csv";
	
//	printf("restoring the cluster from a file\n");
	int optcase;
	
	while ((optcase = getopt(argc, argv, "h:p:n:s:f:d:t:m:v")) != -1) 
	{
		switch (optcase)
		{
		case 'h':
			c.host = strdup(optarg);
			break;
		
		case 'p':
			c.port = atoi(optarg);
			break;
			
		case 'n':
			c.ns = strdup(optarg);
			break;

		case 's':
			c.set = strdup(optarg);
			break;

		case 'f':
                        c.format = strdup(optarg);
                        break;

		case 'd':
                        c.directory = strdup(optarg);
                        break;

		case 't':
			c.n_threads = atoi(optarg);
			break;

		case 'm':
			c.timeout = atoi(optarg);
			break;
			
		case 'v':
			c.verbose = true;
			break;

		default:
			usage();
			return(-1);
			
		}
	}

	if(!strcmp(c.ns, "")) {
		fprintf(stderr, "Its mandatory to specify the namespace.\n");
		exit(1);
	}
	fprintf(stderr, "Loading data: host %s port %d from directory %s\n",c.host,c.port,c.directory);

	citrusleaf_init();

	// create the cluster object - attach
	cl_cluster *asc = citrusleaf_cluster_create();
	if (!asc) { fprintf(stderr, "could not create cluster\n"); return(-1); }
	if (0 != citrusleaf_cluster_add_host(asc, c.host, c.port, 0)) {
		fprintf(stderr, "could not connect to host %s port %d\n",c.host,c.port);
		return(-1);
	}
	c.asc = asc;
	
	c.records_counter = atomic_int_create(0);
	c.timeout_counter = atomic_int_create(0);
	c.fail_counter = atomic_int_create(0);
	c.data_mismatch_counter = atomic_int_create(0);
	c.notfound_counter = atomic_int_create(0);
	void *counter_tid = start_counter_thread( c.records_counter, c.timeout_counter, c.fail_counter, 
		c.data_mismatch_counter, c.notfound_counter );

	DIR *dir = opendir(c.directory);
	if (dir == 0) {
		fprintf(stderr, "directory %s does not exist or can't be accessed or is not a directory\n",c.directory);
		goto Cleanup;
	}

	
	load_struct *todo_head = 0;
	
	// create the list of all work to do
	struct dirent *dit;
	int load_present = 0;
	while (0 != (dit = readdir(dir))) {
		char dn[512];
		
		if(!strcmp(dit->d_name, c.format ) || !strcmp(dit->d_name, ".") || !strcmp(dit->d_name, ".."))
			continue;
		sprintf(dn, "%s/%s",c.directory,dit->d_name);
	
		load_present =1;
		load_struct *rs = malloc(sizeof(load_struct));
		rs->c = &c;
		rs->fd = 0;
		rs->filename = strdup(dn);

		rs->next = todo_head;
		todo_head = rs;
	}

	if(load_present == 0) {
		fprintf(stderr,"No load files present.\n");
		goto Cleanup;
	}
	closedir(dir);
	
	// fire off some threads but not all (or we might swamp everyone)
	load_struct *working_head = 0;
	int i=0;
	while ( (i < c.n_threads) && todo_head) {
		
		// off the todo queue
		load_struct *rs = todo_head;
		todo_head = rs->next;
		
		// onto the working queue
		rs->next = working_head;
		working_head = rs;
		
		// and off we go!
		pthread_create( & rs->th, 0, do_load, rs); 
		
		i++;
	}
	
	
	// for every thread that completes, fire off another. 
	// Annoyingly, we really can't simply look at the head - need head and tail.
	// but performance on the list is unimportant
	while (working_head) {
		// snap off end
		load_struct *e = pop_tail(&working_head);
		
		void *res; // result from the join; ignored
		pthread_join( e->th, &res );

#ifdef DEBUG		
		fprintf(stderr, "completed file %s\n",e->filename);
#endif		

		free(e->filename);
		free(e);
		
		// start another one to tha head, if available
		if (todo_head) {
			e = todo_head;
			todo_head = e->next;
			e->next = working_head;
			working_head = e;
			pthread_create(&e->th, 0, do_load, e);
		}
		
	}
	
Cleanup:
	
	stop_counter_thread(counter_tid);

	citrusleaf_cluster_destroy(asc);

	return(0);
}