Using multiprocessing in python to improve Cassandra write instructions not working -
i trying improve cassandra database write performance using multiprocessing in python given here time taken process has increased lot. want know if making mistake in code. posting python code snippet. inserting data 2 tables using 2 different worker methods. first worker
def worker(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, p): cluster = cluster(['127.0.0.1']) metadata = cluster.metadata session = cluster.connect() session.execute("use db;") print current_process().name session.execute("insert db.day (daymonthyear, ts, c_country, c_lat, c_lon, e_sma, e_dma, e_etype, ip_version, ip_ihl, ip_tos_dscp, ip_totallen, ip_idnum, ip_fragoff, ip_ttl, ip_proto, ip_hdrchksm, ip_sip, ip_dip, ip_opts, s_sp, s_dp, s_vtag, s_chksm) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);",(str(daymonthyear), int(ts1), str(country), str(lat), str(lon), str(sma), str(dma), str(etype), str(version), str(ihl), str(tos_dscp), int(totallen), int(idnum), str(fragoff), int(ttl), int(proto), str(hdrchksm), str(sip), str(dip), str(opts), int(s_sp), int(s_dp), int(s_vtag), str(s_chksm))) session.cluster.shutdown() session.shutdown()
second worker:
def worker1(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, p): cluster = cluster(['127.0.0.1']) metadata = cluster.metadata session = cluster.connect() session.execute("use db;") print current_process().name session.execute("insert db.month (monthyear, ts, c_country, c_lat, c_lon, e_sma, e_dma, e_etype, ip_version, ip_ihl, ip_tos_dscp, ip_totallen, ip_idnum, ip_fragoff, ip_ttl, ip_proto, ip_hdrchksm, ip_sip, ip_dip, ip_opts, u_sp, u_dp, u_len, u_chksm) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);",(str(monthyear), int(ts1), str(country), str(lat), str(lon), str(sma), str(dma), str(etype), str(version), str(ihl), str(tos_dscp), int(totallen), int(idnum), str(fragoff), int(ttl), int(proto), str(hdrchksm), str(sip), str(dip), str(opts), int(u_sp), int(u_dp), int(u_len), str(u_chksm))) session.cluster.shutdown() session.shutdown()
calling method:
def dump(): ts1,buf in pcap: if ip.p == dpkt.ip.ip_proto_tcp: res = pool.apply_async(worker, args=(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,)) res.wait() res = pool.apply_async(worker1, args=(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,)) res.wait() if type(ip.data) == udp : res = pool.apply_async(worker, args=(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,)) res.wait() res = pool.apply_async(worker1, args=(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,)) res.wait()
all variables used declared , there no error in code. problem takes more time executing insert statement sequentially in dump method. can tell if using multiprocessing in right way or not?
connecting cassandra expansive. if connect each process, spending more resources. evident when number of processes increases. making n connections (for large n) dosing in principle.
Comments
Post a Comment