The full text of the c code can be found below.
/*
* Aerospike Foundation
* src/csvloader.c - Uploads data from a csv file.
*
* Copyright (c) 2008-2013 Aerospike, Inc. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <stdbool.h>
#include <dirent.h>
#include <fcntl.h>
#include <ctype.h>
#include <sys/stat.h>
#include <pthread.h>
#include "citrusleaf/cf_shash.h"
#include "citrusleaf/citrusleaf.h"
#include "csvloader.h"
// #define DEBUG 1
// #define DEBUG_VERBOSE 1
// Make sure you free it properly (maintain a copy of original pointer which malloc returned)
// if you allocated this space on heap
char * ltrim(char * s)
{
while(isspace(*s)) {
s++;
}
return s;
}
// Take care when you are using this function extensively
// This function blocks some unused space
char * rtrim(char * s)
{
char * last = s + strlen(s);
while(isspace(*--last));
*(last+1) = '\0';
return s;
}
char * trim(char * s)
{
return rtrim( ltrim(s) );
}
/*
// CAVEAT 1: Do not dereference the return of this function (that may be a segmentation fault in
// case this function just read the last character of this string)
INPUT: It takes a string s, address of a integer variable, address of bool type variable empty and
address of bool type variable valid.
RETURNS: It returns pointer to next token location (Just after the delimiter ) and NULL after the
reading the last token.
Its sets the valid as true if it the trimmed token contains a valid integer
(i.e) matches this regex [space]*[-+](0,1)[digit]*[space]* else false.
It sets empty as false if the token contains only whitespaces or none else true.
*/
char *atoi_tok(char *s, int *val, bool *empty, bool *valid)
{
*empty = true;
*valid = true;
//removing whitespaces..
while( isspace(*s) ) {
s++;
}
//checking for negative value
bool is_neg = false;
if (*s == '-') {
is_neg = true;
s++;
} else if (*s == '+') {
s++;
}
//calculating value..
uint i = 0;
while (*s >= '0' && *s <= '9') {
*empty = false;
i *= 10;
i += *s - '0';
s++;
}
if (is_neg && !empty) {
i = -i;
}
*val = i;
//ignoring trailing whitespaces..
while(isspace(*s)) {
s++;
}
//cheching for delimiter..
if (*s == ',') {
return s+1;
}
else if (*s == '\0') {
return NULL;
}
//if the code comes here, that means the string is invalid..
valid = false;
while(*s != ',' && *s != '\0') {
s++;
}
//Getting the location of next token...
if (*s == ',') {
*s = '\0';
return s+1;
}
else if ( *s == '\0') {
return NULL;
}
return NULL;
}
/*
INPUT: address of a string and an address of a bool type variable
RETURNS: It returns pointer to next token location (Just after the delimiter ) and NULL after the
reading the last token.
If the token is empty or just contains whitespaces, empty would be set as true else false.
The string s will contain the trimmed token (trimmed of whitespaces).
*/
char *getstr_comma_tok(char **s, bool *empty)
{
char *start;
char * end;
char *temp;
start = *s;
end = *s;
temp = start;
*empty = true;
while(isspace(*temp) && *temp != '\n') {
temp++;
}
start = temp;
while ( *temp != ',' && *temp != '\0' && *temp != '\n') {
*empty=false;
temp++;
end = temp;
while(isspace(*temp)) {
temp++;
}
}
*s = start;
if (*temp == ',') {
*end = '\0';
return( temp +1);
}
else if (*temp == '\0' ) {
*end = '\0';
return NULL;
}
return NULL;
}
// Some history
// the backup program is writing bite-sized chunks - files about 16M in size
// so it's reasonable to read one in by the whole
// although it's probably better to make smaller reads and slide the buffer
// around (like 1M reads). Doing this as an MMAP might make sense, but the MMAP
// code has been non-performant in Linux for several years.
//
// This code allocates a single buffer and does one read (on each thread). If you
// attempt to 'do the right thing' and read smaller chunks, you will get bit by
// some kind of more subtle bug that I haven't found yet. So you'll see a whole
// loop down there that's never used.
//
int
load(char *filename, FILE *fd, config *c)
{
int rv = 0;
#ifdef DEBUG
fprintf(stderr, "starting filename: %s FILE %p timeout %d\n",filename,fd,c->timeout);
#endif
size_t len = 10000;
char line[len];
if(fd == NULL ) {
fprintf(stderr, "Error: Opening file %s failed with error = %s\n",
filename, strerror(errno));
exit (1);
}
//Parsing the format file..
char fm_file[500];
sprintf(fm_file, "%s", c->format);
FILE * fmt = NULL;
fmt = fopen(fm_file, "r");
if (fmt == NULL) {
fprintf(stderr, "Opening file %s failed with error = %s\n",fm_file, strerror(errno));
exit (1);
}
fgets(line, len, fmt);
int llen = strlen(line);
char * tmp;
// replacing the end of line char
if (line[llen-2] == '\r') {// Windows machine
// windows generated file sometimes contain \r at n-2th position
line[llen-2]='\0';
//llen -= 2;
} else if (line[llen-1] == '\n') { // General machine (usually linux)
line[llen-1]='\0';
//llen--;
}
// Counting number of objects ( key + bins)
// This function should be changed to include commas in data
int num_obj =0; //Number of total objects (bins + key)
tmp = strtok(line, ",");
while (tmp != NULL)
{
num_obj++;
tmp = strtok(NULL, ",");
}
fgets(line, len, fmt);
llen = strlen(line);
// replacing the end of line char
if (line[llen-2] == '\r') {// Windows machine
// windows generated file sometimes contain \r at n-2th position
line[llen-2]='\0';
//llen -= 2;
} else if (line[llen-1] == '\n') { // General machine (usually linux)
line[llen-1]='\0';
//llen--;
}
//Error case : When first line is not there in the format file.
if(num_obj == 0) {
fprintf(stderr, "Error: Old bin names are not specified in the format.\n");
exit(1);
}
//Getting the name of bins ..
int key_index=-1; //contains the index of key in the csv file..
bool key_appeared = 0; // use for marking the appearance of key.
//Also used for error handling.
int index = 0;
cl_bin bins_buf[num_obj];
tmp = strtok(line, ",");
// Second condition is not redundant, it is to avoid seg fault for corrupt data
while(tmp != NULL && index < num_obj)
{
tmp = trim(tmp);
//take care of case when 2 key comes in a record...
if(strcmp(tmp, "key") == 0) {
if(key_appeared == 1) {
//Error case: When more then one key is found in the format file.
fprintf(stderr, "Error: Composite keys are not supported.\n");
exit(1);
}
key_index = index;
key_appeared = 1;
} else if(tmp!= NULL){
strcpy(bins_buf[index-key_appeared].bin_name, tmp);
} else {
//Error case: when null/whitespaces values are found as bin name.
fprintf(stderr, "Error: NULL values in the format file.\n");
exit(1);
}
tmp = strtok(NULL, ",");
index++;
}
//Error case: Second line missing in the format..
if(index == 0) {
fprintf(stderr, "Error: Keys and bin names are not specified in the expected format.\n");
exit(1);
}
//Revising number of objects.
if(tmp==NULL) {
num_obj = index;
}
//Error case: no "key" word is found in the third line.
if(key_index == -1) {
fprintf(stderr, "Error: No key found in the format.\n");
exit(1);
}
fgets(line, len, fmt);
llen = strlen(line);
// replacing the end of line char
if (line[llen-2] == '\r') {// Windows machine
// windows generated file sometimes contain \r at n-2th position
line[llen-2]='\0';
//llen -= 2;
} else if (line[llen-1] == '\n') { // General machine (usually linux)
line[llen-1]='\0';
//llen--;
}
index =0;
key_appeared = 0;
//Getting types of bins...
char * key_type = NULL; // type of key..
tmp = strtok(line, ",");
// Second condition is not redundant, it is to avoid seg fault for corrupt data
while(tmp != NULL && index < num_obj )
{
tmp = trim(tmp);
if (index == key_index) {
key_appeared = 1;
key_type = tmp;
if (strncmp(tmp, "int", 3) !=0 && strncmp(tmp,"str", 3) != 0 ) {
fprintf(stderr, "Error: Ambigious type for key.\n");
exit(1);
}
}
else {
if (strncmp(tmp, "int", 3) == 0) {
bins_buf[index-key_appeared].object.type = CL_INT;
}
else if (strncmp(tmp,"str", 3) == 0) {
bins_buf[index-key_appeared].object.type = CL_STR;
}
else {
fprintf(stderr, "Error: Ambigious bin type in the format file.\n");
exit(1);
}
}
index++;
tmp = strtok(NULL, ",");
}
if(index != num_obj && tmp == NULL) {
fprintf(stderr, "Error: Some bins/key are not assigned any data-type.\n");
exit(1);
}
//Parsing of format file is done here..
///////////////////////////////////////////////
cl_bin *bins = 0, *bin = 0;
// find out the size of the file
fseek(fd,0,SEEK_END);
off_t READ_SIZE = ftell(fd);
fseek(fd,0,SEEK_SET);
// allocate a large block..
uint8_t *buf = malloc(READ_SIZE);
if (!buf) {
fprintf(stderr, "out of memory reading a buffer for disk, aborting");
return(-1);
}
// buffer offset
size_t off = 0;
// for prints, file offset
size_t file_off = 0;
do {
#ifdef DEBUG
fprintf(stderr, "read a chunk of %s file: offset %zu fileoffset %zu\n",filename,off,file_off);
#endif
// read a block
size_t s = fread(buf + off, 1, READ_SIZE - off, fd);
if (s <= 0) {
#ifdef DEBUG
fprintf(stderr, "done! %s s %zd\n",filename,s);
#endif
goto Cleanup;
}
file_off += (READ_SIZE - off);
char *lim = (char *) (buf + (s + off));
char *r_s = (char *) buf; // record start
char *l_s; // line start
char *l_e = 0; // line end
do {
char *ns;
char *set;
ns = c->ns;
set = c->set;
l_s = r_s;
bool skip = 0; // to keep track of corrupt record..
#ifdef DEBUG_VERBOSE
fprintf(stderr, "next line: parse header: offset %zd\n",l_s - buf);
#endif
//parsing the .csv file..
l_e = l_s;
while (*l_e++ != '\n') {
if (l_e == lim) goto BufEnd;
}
//Replacing the last character of line with \0
*(l_e-1) = 0;
//assuming no commas in the value..bad assumption will change it in second go..
cl_object key_o;
int curr_obj_size = 0;
cl_bin curr_bins[num_obj];
char * str_value;
int int_value;
key_appeared=0;
for (int i=0; i<num_obj && l_s != NULL; i++) {
bool valid=true;
bool empty=true;
if(i == key_index) {
key_appeared = 1;
if(strncmp(key_type,"str", 3) == 0) {
str_value = l_s;
l_s = getstr_comma_tok(&str_value, &empty); // will return the next char address if it founds a null value..
if (empty == true) {
fprintf(stderr, "Key value not found in some record.\n");
skip =1;
break;
} else {
citrusleaf_object_init_str(&key_o, str_value);
}
} else if (strncmp(key_type,"int", 3) == 0){
l_s = atoi_tok(l_s, &int_value, &empty, &valid);
if(valid == false){
skip = 1;
break;
}
else if (empty == true){
fprintf(stderr, "Key value not found in some record.\n");
skip = 1;
break;
}
else {
citrusleaf_object_init_int(&key_o, int_value);
fprintf(stderr, "key.. %d\n",int_value );
}
}
} else {
if(bins_buf[i-key_appeared].object.type == CL_STR) {
str_value = l_s;
l_s = getstr_comma_tok(&str_value, &empty); //get the value
if(empty == false) {
curr_bins[curr_obj_size] = bins_buf[i-key_appeared];
citrusleaf_object_init_str(&(curr_bins[curr_obj_size].object), str_value); //attaching the object..
curr_obj_size++;
}
}
else if(bins_buf[i-key_appeared].object.type == CL_INT) {
l_s = atoi_tok(l_s, &int_value, &empty, &valid);
if(valid == false) {
fprintf(stderr, "Ambigous data. Expecting integer instead of string.\n");
skip = 1;
break;
} else if(empty == false ) {
curr_bins[curr_obj_size] = bins_buf[i-key_appeared];
citrusleaf_object_init_int(&(curr_bins[curr_obj_size].object), int_value);
curr_obj_size++;
}
}
}
}
if (key_appeared == 0) {
fprintf(stderr, "Error: No key found for the record in the data file.\n");
skip =1;
}
l_s = l_e;
if(skip == 1) {
r_s = l_s;
continue;
}
#ifdef DEBUG_VERBOSE
fprintf(stderr, "load test: namespace %s digest %"PRIx64"\n",ns,*(uint64_t *) &digest);
#endif
bins = bin = &bins_buf[0];
cl_rv rv;
// compared with the real world
//atomic_int_add(c->notfound_counter, 1);
// if (c->verbose) fprintf(stderr, "get failed in load: NOTFOUND\n");
cl_write_parameters cl_w_p;
cl_write_parameters_set_default(&cl_w_p);
rv = citrusleaf_put(c->asc, ns, set, &key_o, curr_bins, curr_obj_size, &cl_w_p);
if (rv != CITRUSLEAF_OK) {
fprintf(stderr, "get failed in load: rv %d\n",rv);
if(rv == 4) {
fprintf(stderr, "Namespace mismatch\n");
}
atomic_int_add(c->fail_counter, 1);
}
atomic_int_add(c->records_counter, 1);
bins = 0;
r_s = l_s;
} while(r_s < lim);
BufEnd:
// take the partial and the end and slide it to the front
if (r_s < lim) {
memcpy( buf, r_s, lim - r_s);
#ifdef DEBUG
fprintf(stderr, "%s: end of buffer: moved %zu bytes from %p to %p\n",filename,lim - r_s, r_s, buf);
#endif
}
else if (r_s == lim) {
#ifdef DEBUG
fprintf(stderr, "%s: end of buffer: exact match in size: %p %p\n",filename, r_s, lim);
#endif
}
else {
#ifdef DEBUG
fprintf(stderr, "%s: end of buffer: record start is beyond limit: serious internal error: rs %p lim %p\n",filename, r_s, lim);
#endif
}
off = lim - r_s;
} while(1);
Cleanup:
free(buf);
if (bins && (bins_buf != bins)) free(bins);
return(rv);
}
typedef struct load_structure_s {
struct load_structure_s *next;
config *c;
FILE *fd;
char *filename; // debug prints like this
pthread_t th;
} load_struct;
void *
do_load(void *arg)
{
load_struct *rs = (load_struct *) arg;
rs->fd = fopen(rs->filename, "r");
if (rs->fd) {
load(rs->filename, rs->fd, rs->c);
// close the file & get out of dodge
fclose(rs->fd);
}
else {
fprintf(stderr, "cannot open file %s\n", rs->filename);
}
return(0);
}
typedef struct {
atomic_int *records;
atomic_int *timeouts;
atomic_int *fails;
atomic_int *data_mismatches;
atomic_int *notfound;
int death;
pthread_t th;
} counter_thread_control;
void *
counter_fn(void *arg)
{
counter_thread_control *ctc = (counter_thread_control *) arg;
while (ctc->death == 0) {
sleep(1);
// fprintf(stderr, "load: records %"PRIu64" bytes %"PRIu64"\n",
// atomic_int_get(ctc->records), atomic_int_get(ctc->bytes) );
fprintf(stderr, "load: records %"PRIu64" timeouts %"PRIu64" fails %"PRIu64" wrongdata %"PRIu64" notfound %"PRIu64"\n",
atomic_int_get(ctc->records), atomic_int_get(ctc->timeouts) , atomic_int_get(ctc->fails),
atomic_int_get(ctc->data_mismatches) , atomic_int_get(ctc->notfound) );
}
return(0);
}
void *
start_counter_thread(atomic_int *records, atomic_int *timeouts, atomic_int *fails, atomic_int *data_mismatches, atomic_int *notfound)
{
counter_thread_control *ctc = (counter_thread_control *) malloc(sizeof(counter_thread_control));
ctc->records = records;
ctc->timeouts = timeouts;
ctc->fails = fails;
ctc->data_mismatches = data_mismatches;
ctc->notfound = notfound;
ctc->death = 0;
pthread_create(&ctc->th, 0, counter_fn, ctc);
return(ctc);
}
void
stop_counter_thread(void *control)
{
counter_thread_control *ctc = (counter_thread_control *)control;
ctc->death = 1;
pthread_join(ctc->th, 0);
free(ctc);
}
void usage(void) {
fprintf(stderr, "Usage key_c:\n");
fprintf(stderr, "-h host [default 127.0.0.1] \n");
fprintf(stderr, "-p port [default 3000]\n");
fprintf(stderr, "-t threads [default 40]\n");
fprintf(stderr, "-d directory [default cl_load]\n");
fprintf(stderr, "-n namespace [its compulsory to specify this.]\n");
fprintf(stderr, "-m milliseconds timeout [default 100]\n");
fprintf(stderr, "-v is verbose\n");
fprintf(stderr, "-s is set [default \"\"]\n");
fprintf(stderr, "-f is format file. [default format.csv]\n");
}
load_struct *
pop_tail(load_struct **list)
{
if ((list == 0) || (*list == 0)) return(0);
load_struct *e = *list;
// find the tail
load_struct *prev = 0;
while (e->next ) {
prev = e;
e = e->next;
}
// fixup list
if (prev)
prev->next = 0;
else
*list = 0;
return(e);
}
int
main(int argc, char **argv)
{
config c;
memset(&c, 0, sizeof(c));
c.host = "127.0.0.1";
c.port = 3000;
c.ns = "";
c.set = "";
c.verbose = false;
c.directory = "cl_load";
c.n_threads = 40;
c.timeout = 100;
c.format = "format.csv";
// printf("restoring the cluster from a file\n");
int optcase;
while ((optcase = getopt(argc, argv, "h:p:n:s:f:d:t:m:v")) != -1)
{
switch (optcase)
{
case 'h':
c.host = strdup(optarg);
break;
case 'p':
c.port = atoi(optarg);
break;
case 'n':
c.ns = strdup(optarg);
break;
case 's':
c.set = strdup(optarg);
break;
case 'f':
c.format = strdup(optarg);
break;
case 'd':
c.directory = strdup(optarg);
break;
case 't':
c.n_threads = atoi(optarg);
break;
case 'm':
c.timeout = atoi(optarg);
break;
case 'v':
c.verbose = true;
break;
default:
usage();
return(-1);
}
}
if(!strcmp(c.ns, "")) {
fprintf(stderr, "Its mandatory to specify the namespace.\n");
exit(1);
}
fprintf(stderr, "Loading data: host %s port %d from directory %s\n",c.host,c.port,c.directory);
citrusleaf_init();
// create the cluster object - attach
cl_cluster *asc = citrusleaf_cluster_create();
if (!asc) { fprintf(stderr, "could not create cluster\n"); return(-1); }
if (0 != citrusleaf_cluster_add_host(asc, c.host, c.port, 0)) {
fprintf(stderr, "could not connect to host %s port %d\n",c.host,c.port);
return(-1);
}
c.asc = asc;
c.records_counter = atomic_int_create(0);
c.timeout_counter = atomic_int_create(0);
c.fail_counter = atomic_int_create(0);
c.data_mismatch_counter = atomic_int_create(0);
c.notfound_counter = atomic_int_create(0);
void *counter_tid = start_counter_thread( c.records_counter, c.timeout_counter, c.fail_counter,
c.data_mismatch_counter, c.notfound_counter );
DIR *dir = opendir(c.directory);
if (dir == 0) {
fprintf(stderr, "directory %s does not exist or can't be accessed or is not a directory\n",c.directory);
goto Cleanup;
}
load_struct *todo_head = 0;
// create the list of all work to do
struct dirent *dit;
int load_present = 0;
while (0 != (dit = readdir(dir))) {
char dn[512];
if(!strcmp(dit->d_name, c.format ) || !strcmp(dit->d_name, ".") || !strcmp(dit->d_name, ".."))
continue;
sprintf(dn, "%s/%s",c.directory,dit->d_name);
load_present =1;
load_struct *rs = malloc(sizeof(load_struct));
rs->c = &c;
rs->fd = 0;
rs->filename = strdup(dn);
rs->next = todo_head;
todo_head = rs;
}
if(load_present == 0) {
fprintf(stderr,"No load files present.\n");
goto Cleanup;
}
closedir(dir);
// fire off some threads but not all (or we might swamp everyone)
load_struct *working_head = 0;
int i=0;
while ( (i < c.n_threads) && todo_head) {
// off the todo queue
load_struct *rs = todo_head;
todo_head = rs->next;
// onto the working queue
rs->next = working_head;
working_head = rs;
// and off we go!
pthread_create( & rs->th, 0, do_load, rs);
i++;
}
// for every thread that completes, fire off another.
// Annoyingly, we really can't simply look at the head - need head and tail.
// but performance on the list is unimportant
while (working_head) {
// snap off end
load_struct *e = pop_tail(&working_head);
void *res; // result from the join; ignored
pthread_join( e->th, &res );
#ifdef DEBUG
fprintf(stderr, "completed file %s\n",e->filename);
#endif
free(e->filename);
free(e);
// start another one to tha head, if available
if (todo_head) {
e = todo_head;
todo_head = e->next;
e->next = working_head;
working_head = e;
pthread_create(&e->th, 0, do_load, e);
}
}
Cleanup:
stop_counter_thread(counter_tid);
citrusleaf_cluster_destroy(asc);
return(0);
}