python - MySQL keeps losing connection during celery tasks -
i'm trying process entire csv file fast possible, i'm looking process each line in parallel celery task. cleanup, celery task, has wait until every line processed. see example below.
the problem is, can't seem through file because keep running connection errors mysql. far, i've seen these 2 errors: 2013, 'lost connection mysql server during query' , 2006, 'mysql server has gone away'
from app.db.meta import session celery import chord, celery celery.signals import task_postrun celery = celery() celery.config_from_object('config') @task_postrun.connect def close_session(*args, **kwargs): session.remove() def main(): # process each line in parallel header = [process_line.s(line) line in csv_file] # pass stats cleanup after lines processed callback = cleanup.s() chord(header)(callback) @celery.task def process_line(line): session = session() ... # process line ... return stats @celery.task def cleanup(stats): session = session() ... # cleanup , log stats ... i'm using celery 3.1.18 , sqlalchemy 0.9.9. i'm using connection pooling well.
mysql> show full processlist; +----+------+-----------+-----------------+---------+------+-------+-----------------------+ | id | user | host | db | command | time | state | info | +----+------+-----------+-----------------+---------+------+-------+-----------------------+ | 1 | root | localhost | ab__development | sleep | 4987 | | null | | 11 | root | localhost | ab__development | sleep | 1936 | | null | | 16 | root | localhost | ab__development | sleep | 143 | | null | | 17 | root | localhost | ab__development | sleep | 1045 | | null | | 18 | root | localhost | null | query | 0 | init | show full processlist | | 21 | root | localhost | ab__development | sleep | 7 | | null | +----+------+-----------+-----------------+---------+------+-------+-----------------------+ 6 rows in set (0.01 sec)
read answer. in short have either disable sqlalchemy's pool engine or try ping mysql server:
from flask.ext.sqlalchemy import sqlalchemy sqlalchemy import event, exc def instance(app): """:rtype: sqlalchemy""" db = sqlalchemy(app) if app.testing: return db @event.listens_for(db.engine, 'checkout') def checkout(dbapi_con, con_record, con_proxy): try: try: dbapi_con.ping(false) except typeerror: app.logger.debug('mysql connection died. restoring...') dbapi_con.ping() except dbapi_con.operationalerror e: app.logger.warning(e) if e.args[0] in (2006, 2013, 2014, 2045, 2055): raise exc.disconnectionerror() else: raise return db
Comments
Post a Comment