Aeropike_key_operate and as_policy_operate issues


#1

Hi, I’m trying to write a client app that will run multiple instances and potentially try to update the same bin on the same record, so want to make use of policies to ensure updates are applied sanely. The example code below works fine w.r.t record creation, but adding a bin to the record always fails.

Is anyone able to tell me what I’m doing wrong as I’ve not been able to find the solution in client examples or in the forums?

Many thanks, rdev

main(int argc, char *argv[]) {
        char host[] = "127.0.0.1";
        uint16_t port = 3000;
        aerospike as;
        as_error err;

        as_record *p_rec;

        // Initialize cluster configuration.
        as_config cfg;
        as_config_init(&cfg);
        as_config_add_host(&cfg, host, port);

        aerospike_init(&as, &cfg);

        // Connect to the Aerospike database cluster.
        if (aerospike_connect(&as, &err) != AEROSPIKE_OK) {
                fprintf(stderr,"aerospike_connect() returned %d - %s", err.code, err.message);
                aerospike_destroy(&as);
                exit(-1);
        }

        fprintf(stderr,"aerospike_connect() OK!\n");

        char aerospike_namespace[] = "db";
        char aerospike_set[] = "test";

        as_record rec;
        as_record_inita(&rec,1);
        as_record_set_str(&rec, "test-bin-1", "abc");

        as_key g_key;
        as_key_init_int64(&g_key,aerospike_namespace,aerospike_set,123);


        as_policy_write wpolicy;
        as_policy_write_init(&wpolicy);
        wpolicy.gen = AS_POLICY_GEN_EQ;                // Ensure no changes have occurred
        wpolicy.exists = AS_POLICY_EXISTS_CREATE;      // Only create if doesnt exist

        // if (aerospike_key_put(&as, &err, NULL, &g_key, &rec) != AEROSPIKE_OK) {
        if (aerospike_key_put(&as, &err, &wpolicy, &g_key, &rec) != AEROSPIKE_OK) {
                fprintf(stderr,"aerospike_key_put error (%d) %s at [%s:%d]\n",err.code, err.message, err.file, err.line);
        }

        // Re-read the record to determine generation token:
        if (aerospike_key_get(&as, &err, NULL, &g_key, &p_rec) != AEROSPIKE_OK) {
                fprintf(stderr,"aerospike_key_get error (%d) %s at [%s:%d]\n",err.code, err.message, err.file, err.line);
        }
        fprintf(stderr,"AFTER GET: p_rec->gen %u, p_rec->ttl %u\n",p_rec->gen,p_rec->ttl);


as_digest *digest = as_key_digest(&g_key); fprintf(stderr,"g_key digest ("); int i; for (i = 0 ; i < 20 ; i++) { fprintf(stderr,"%02X",g_key.digest.value[i]); }; fprintf(stderr,")\n");


        as_policy_operate opolicy;
        as_policy_operate_init(&opolicy);
        opolicy.gen = AS_POLICY_GEN_EQ;                // Ensure no changes have occurred since retrieval

        as_operations ops;
        as_operations_inita(&ops, 1);

        as_operations_add_prepend_str(&ops, "test-bin-2", "def");

        // if (aerospike_key_operate(&as, &err, NULL, &g_key, &ops, &p_rec) != AEROSPIKE_OK) {
        if (aerospike_key_operate(&as, &err, &opolicy, &g_key, &ops, &p_rec) != AEROSPIKE_OK) {
                fprintf(stderr,"aerospike_key_operate() returned %d - %s\n", err.code, err.message);
                aerospike_destroy(&as);
                exit(-1);
        }

        fprintf(stderr,"AFTER MAP BIN PUT: p_rec->gen %u, p_rec->ttl %u\n",p_rec->gen,p_rec->ttl);

        as_operations_destroy(&ops);
        if (aerospike_key_get(&as, &err, NULL, &g_key, &p_rec) != AEROSPIKE_OK) {
                fprintf(stderr,"aerospike_key_get error (%d) %s at [%s:%d]\n",err.code, err.message, err.file, err.line);
        }
        fprintf(stderr,"AFTER GET: p_rec->gen %u, p_rec->ttl %u\n",p_rec->gen,p_rec->ttl);

        as_key_destroy(&g_key);
}

Output from First run:

aerospike_connect() OK! AFTER GET: p_rec->gen 1, p_rec->ttl 2592000 g_key digest (E3892809FBC1FAFFEAF58CF0A30E34E7456B73AB) aerospike_key_operate() returned 3 - AEROSPIKE_ERR_RECORD_GENERATION Aerospike Query Client Version 3.9.1 Copyright 2012-2016 Aerospike. All rights reserved. aql> set RECORD_PRINT_METADATA true aql> select * from db.test ±-----------±-------------------------------±--------±------+ | test-bin-1 | {digest} | {ttl} | {gen} | ±-----------±-------------------------------±--------±------+ | “abc” | “44koCfvB+v/q9Yzwow4050Vrc6s=” | 2591946 | 1 | ±-----------±-------------------------------±--------±------+ 1 row in set (0.019 secs)

The second run will fail on both the aerospike_key_put (which is expected) and aerospike_key_operate.

aerospike_connect() OK! aerospike_key_put error (5) AEROSPIKE_ERR_RECORD_EXISTS at [src/main/aerospike/as_command.c:594] AFTER GET: p_rec->gen 1, p_rec->ttl 2591909 g_key digest (E3892809FBC1FAFFEAF58CF0A30E34E7456B73AB) aerospike_key_operate() returned 3 - AEROSPIKE_ERR_RECORD_GENERATION Aerospike Query Client Version 3.9.1 Copyright 2012-2016 Aerospike. All rights reserved. aql> set RECORD_PRINT_METADATA true aql> select * from db.test ±-----------±-------------------------------±--------±------+ | test-bin-1 | {digest} | {ttl} | {gen} | ±-----------±-------------------------------±--------±------+ | “abc” | “44koCfvB+v/q9Yzwow4050Vrc6s=” | 2591897 | 1 | ±-----------±-------------------------------±--------±------+ 1 row in set (0.018 secs)


#2

AS_POLICY_GEN_EQ is used for a Read/Modify/Write sequence where you want to cancel your write if some other process already wrote between your read and write. You have to read the record generation before attempting AS_POLICY_GEN_EQ.

Either remove AS_POLICY_GEN_EQ or read the record and then set the actual generation on the write.

as_record* p_rec = NULL;
aerospike_key_get(&as, &err, NULL, &g_key, &p_rec);
     
as_policy_write wpolicy;
as_policy_write_init(&wpolicy);
wpolicy.gen = AS_POLICY_GEN_EQ;                // Ensure no changes have occurred
wpolicy.exists = AS_POLICY_EXISTS_CREATE;      // Only create if doesnt exist
rec.gen = p_rec->gen;

aerospike_key_put(&as, &err, &wpolicy, &g_key, &rec);

#3

Thanks Brian, I don’t have problems with aerospike_key_put, but with the aerospike_key_operate which takes an as_policy_operate structure rather than as_policy_write. Also fwiw in the example above I already read the record after initial operation before attempting to use aerospike_key_operate.


#4

I’ve found the cause. I needed to set the operation generation token to be the same value as the record generation token. ie:

ops.gen = p_rec->gen ; if (aerospike_key_operate(&as, &err, &opolicy, &g_key, &ops, &p_rec) != AEROSPIKE_OK) { …

Hope this hjelps someone else, rdev.