ConnectionError: (-10L, 'Socket read error: 104, :3000, 35382', 'src/main/aerospike/as_socket.c', 248, False)

Hi All,

I am trying to introduce new bin in existing set. Set is having millions records. Below is the code

def testfunc(client) :
  try :
    query = client.query('ns', 'set').where(p.equals(dummyidx, 0))
    queryrs = query.results({'total_timeout':1000000})
   
    for (key, meta, bins) in queryrs:
        if 'rs' not in bins or bins['rs'] is None:
           bins['rs'] = 1000
        client.put(key, bins)
  except Exception as e:
    with open(logfile, 'a') as f:
        f.write(str(e))
        f.write(traceback.format_exc())
        print('Exception is occured in testfunc method. Check Log file further')
    f.close()

Getting ConnectionError. Let me know what is the best way to introduce bin for existing set.

Thanks Gajendra

Is your client configured correctly? Based on the error message you provided, I would have expected an IP address in front of the port ( : 3000). Are you able to do any operation? Is the query working? I am not an expert but that error message seems it really should have some IP address or host name…

Hi Meher,

Sorry for late response. Able to do operations with other set. I have manually removed the ip address before posting the issue. Code is modified like below and working fine. But it is consuming 90 minutes for 7 million records. Is there any way to process faster.

def test(client) :
 try :     
    rs = client.scan('ns', 'set')
    def callback(input_tuple):
        (key, meta, rec) = input_tuple
        if 'bin1' not in rec or rec['bin1'] is None:
           rec['bin1'] = 1000
        client.put(key,rec)
    rs.foreach(callback)
 except Exception as e:
    with open('/var/log/rs.log', 'a') as f:
        f.write(str(e))
        f.write(traceback.format_exc())
        print('Exception is occured in test method. Check Log file further')
    f.close()

It seems you are using a scan (primary index query) to only select records for a particular set. This can be slow if the namespace is large (see this article: FAQ - Scans in Aerospike and specifically the set index part). You may also want to check on operation background scans to avoid having each record returned to the client. I am not an expert on those but I am pretty sure this can be done in one shot from the client with the right expression and the server will do it fully in the background. If you have an index on the set, it should be as fast as the storage can take on the database. Here is the doc on Expressions.

I actually tried the following code (I am not a python dev, I just put some stuff together and tried on the sandbox) to add a new bin:

# Import Aerospike client libraries
import aerospike
from aerospike import exception as ex
from aerospike_helpers.operations import operations as op_helpers

# This is for formatting out in the sandbox
import pprint

# Define the connection config
config = {
    'hosts': [ ('127.0.0.1', 3000) ]
}

print('Setup complete')
# Establish connection to the server
client = aerospike.client(config).connect()

try:   
    scan = client.scan('sandbox', 'ufodata') 
    ops = [op_helpers.append('bin1', '1000')]
    scan.add_ops(ops)

    id = scan.execute_background()
except ex.AerospikeError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
finally:
    # Close the client
    client.close()

You can add a filter expression to select specific records to add the bin to, based on values of other bins, etc… Just for the sake of the example (taking in from the example in the Python API Doc):

    from aerospike_helpers import expressions as exp
    # check that the record has value < 2 or value == 3 in bin 'name'
    expr = exp.Or(
        exp.LT(exp.IntBin("number"), 2),
        exp.Eq(exp.IntBin("number"), 3)
    ).compile()

    policy = {
        'expressions': expr
    }
    scan.execute_background(policy)

Basically, construct the expression to filter and pass it in as an argument to the background ops scan.

Thanks for your reply @meher.

This topic was automatically closed 365 days after the last reply. New replies are no longer allowed.