Using multiprocessing in python to improve Cassandra write instructions not working -


i trying improve cassandra database write performance using multiprocessing in python given here time taken process has increased lot. want know if making mistake in code. posting python code snippet. inserting data 2 tables using 2 different worker methods. first worker

    def worker(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, p):          cluster = cluster(['127.0.0.1'])         metadata = cluster.metadata         session = cluster.connect()          session.execute("use db;")         print current_process().name          session.execute("insert db.day (daymonthyear, ts, c_country, c_lat, c_lon, e_sma, e_dma, e_etype, ip_version, ip_ihl, ip_tos_dscp, ip_totallen, ip_idnum, ip_fragoff, ip_ttl, ip_proto, ip_hdrchksm, ip_sip, ip_dip, ip_opts, s_sp, s_dp, s_vtag, s_chksm) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);",(str(daymonthyear), int(ts1), str(country), str(lat), str(lon), str(sma), str(dma), str(etype), str(version), str(ihl), str(tos_dscp), int(totallen), int(idnum), str(fragoff), int(ttl), int(proto), str(hdrchksm), str(sip), str(dip), str(opts), int(s_sp), int(s_dp), int(s_vtag), str(s_chksm)))          session.cluster.shutdown()         session.shutdown() 

second worker:

    def worker1(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, p):          cluster = cluster(['127.0.0.1'])         metadata = cluster.metadata         session = cluster.connect()         session.execute("use db;")         print current_process().name         session.execute("insert db.month (monthyear, ts, c_country, c_lat, c_lon, e_sma, e_dma, e_etype, ip_version, ip_ihl, ip_tos_dscp, ip_totallen, ip_idnum, ip_fragoff, ip_ttl, ip_proto, ip_hdrchksm, ip_sip, ip_dip, ip_opts, u_sp, u_dp, u_len, u_chksm) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);",(str(monthyear), int(ts1), str(country), str(lat), str(lon), str(sma), str(dma), str(etype), str(version), str(ihl), str(tos_dscp), int(totallen), int(idnum), str(fragoff), int(ttl), int(proto), str(hdrchksm), str(sip), str(dip), str(opts), int(u_sp), int(u_dp), int(u_len), str(u_chksm)))          session.cluster.shutdown()         session.shutdown() 

calling method:

def dump():      ts1,buf in pcap:         if ip.p == dpkt.ip.ip_proto_tcp:             res = pool.apply_async(worker, args=(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,))             res.wait()             res = pool.apply_async(worker1, args=(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,))             res.wait()         if type(ip.data) == udp :             res = pool.apply_async(worker, args=(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,))             res.wait()             res = pool.apply_async(worker1, args=(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,))             res.wait() 

all variables used declared , there no error in code. problem takes more time executing insert statement sequentially in dump method. can tell if using multiprocessing in right way or not?

connecting cassandra expansive. if connect each process, spending more resources. evident when number of processes increases. making n connections (for large n) dosing in principle.


Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - .htaccess mod_rewrite for dynamic url which has domain names -

Website Login Issue developed in magento -