8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

Python mysql(使用pymysql)自动重新连接

rischan 1月前

103 0

我不确定这是否可行,但我正在寻找一种在连接丢失时重新连接到 mysql 数据库的方法。所有连接都保存在 gevent 队列中,但我认为这不重要...

我不确定这是否可行,但我正在寻找一种在连接丢失时重新连接到 mysql 数据库的方法。所有连接都保存在 gevent 队列中,但我认为这不重要。我确信如果我花点时间,我可以想出一种重新连接到数据库的方法。然而,我浏览了 pymysql 代码,发现 Connection 类中有一个“ping”方法,我不确定如何使用它。

该方法看起来第一次会重新连接,但之后它又将重新连接标志切换为 False?我可以使用这种方法吗?或者如果丢失连接,是否有其他方法建立连接?即使不是 pymysql,人们如何应对数据库服务器宕机并必须重新建立与 mysql 服务器的连接?

def ping(self, reconnect=True):
    ''' Check if the server is alive '''
    if self.socket is None:
        if reconnect:
            self._connect()
            reconnect = False
        else:
            raise Error("Already closed")
    try:
        self._execute_command(COM_PING, "")
        return self._read_ok_packet()
    except Exception:
        if reconnect:
            self._connect()
            return self.ping(False)
        else:
            raise
帖子版权声明 1、本帖标题:Python mysql(使用pymysql)自动重新连接
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由rischan在本站《mysql》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 不确定这是否有用,但请看一下 Twisted 的 ReconnectingConnectionPool 配方

  • 我在 PyMySQL 文档上找到了使用“连接”的“ping”方法的解决方案,并给出了一个例子

  • 最终得到了一个可行的解决方案,可能会对某些人有所帮助。

    from gevent import monkey
    monkey.patch_socket()
    import logging
    
    import gevent
    from gevent.queue import Queue
    import pymysql as db
    
    logging.basicConfig(level=logging.DEBUG)
    LOGGER = logging.getLogger("connection_pool")
    
    
    class ConnectionPool:
        def __init__(self, db_config, time_to_sleep=30, test_run=False):
            self.username = db_config.get('user')
            self.password = db_config.get('password')
            self.host = db_config.get('host')
            self.port = int(db_config.get('port'))
            self.max_pool_size = 20
            self.test_run = test_run
            self.pool = None
            self.time_to_sleep = time_to_sleep
            self._initialize_pool()
    
        def get_initialized_connection_pool(self):
            return self.pool
    
        def _initialize_pool(self):
            self.pool = Queue(maxsize=self.max_pool_size)
            current_pool_size = self.pool.qsize()
            if current_pool_size < self.max_pool_size:  # this is a redundant check, can be removed
                for _ in xrange(0, self.max_pool_size - current_pool_size):
                    try:
                        conn = db.connect(host=self.host,
                                          user=self.username,
                                          passwd=self.password,
                                          port=self.port)
                        self.pool.put_nowait(conn)
    
                    except db.OperationalError, e:
                        LOGGER.error("Cannot initialize connection pool - retrying in {} seconds".format(self.time_to_sleep))
                        LOGGER.exception(e)
                        break
            self._check_for_connection_loss()
    
        def _re_initialize_pool(self):
            gevent.sleep(self.time_to_sleep)
            self._initialize_pool()
    
        def _check_for_connection_loss(self):
            while True:
                conn = None
                if self.pool.qsize() > 0:
                    conn = self.pool.get()
    
                if not self._ping(conn):
                    if self.test_run:
                        self.port = 3306
    
                    self._re_initialize_pool()
    
                else:
                    self.pool.put_nowait(conn)
    
                if self.test_run:
                    break
                gevent.sleep(self.time_to_sleep)
    
        def _ping(self, conn):
            try:
                if conn is None:
                    conn = db.connect(host=self.host,
                                      user=self.username,
                                      passwd=self.password,
                                      port=self.port)
                cursor = conn.cursor()
                cursor.execute('select 1;')
                LOGGER.debug(cursor.fetchall())
                return True
    
            except db.OperationalError, e:
                LOGGER.warn('Cannot connect to mysql - retrying in {} seconds'.format(self.time_to_sleep))
                LOGGER.exception(e)
                return False
    
    # test (pytest compatible) -------------------------------------------------------------------------------------------
    import logging
    
    from src.py.ConnectionPool import ConnectionPool
    
    logging.basicConfig(level=logging.DEBUG)
    LOGGER = logging.getLogger("test_connection_pool")
    
    
    def test_get_initialized_connection_pool():
        config = {
            'user': 'root',
            'password': '',
            'host': '127.0.0.1',
            'port': 3305
        }
        conn_pool = ConnectionPool(config, time_to_sleep=5, test_run=True)
        pool = conn_pool.get_initialized_connection_pool()
        # when in test run the port will be switched back to 3306
        # so the queue size should be 20 - will be nice to work 
        # around this rather than test_run hack
        assert pool.qsize() == 20
    
  • 好吧,我的应用程序中遇到了同样的问题,我在 PyMySQL 文档 ,它可以 ping 到服务器并检查连接是否已关闭,如果已关闭,则它会重新连接。

    from pymysql import connect
    from pymysql.cursors import DictCursor
    
    # create the connection
    connection = connect(host='host', port='port', user='user', 
                         password='password', db='db', 
                         cursorclass=DictCursor)
    
    # get the cursor
    cursor = connection.cursor()
    
    # if the connection was lost, then it reconnects
    connection.ping(reconnect=True)      
    
    # execute the query
    cursor.execute(query)
    

    我希望它有帮助。

  • 值得注意的是,ping 是一个便利功能,而 reconnect=True 执行 pymysql.connection.connect(),这意味着任何连接设置(模式、自动提交、tx 隔离)都应该重新配置。

  • @RichAndrews 您是说我之前设置的 MySQL 变量被清除了,我需要重置它们吗?

  • @Volatil3 我还没有把它分开,pymysql 客户端中的 reconnect=True 将成为一个新的 MySQL 客户端连接,并假定任何客户端会话状态都将被清除。如果有人有其他想法,请听听。

  • JK. 1月前 0 只看Ta
    引用 9

    最简单的方法是在发送查询之前检查连接。

    您可以通过创建一个包含两种方法的小类来实现此目的: connect query

    import pymysql
    import pymysql.cursors
    
    class DB:
        def connect(self):
            self.conn = pymysql.connect(
                                 host=hostname,
                                 user=username,
                                 password=password,
                                 db=dbname,
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor,
                                 port=3306)
    
        def query(self, sql):
            try:
                cursor = self.conn.cursor()
                cursor.execute(sql)
            except pymysql.OperationalError:
                self.connect()
                cursor = self.conn.cursor()
                cursor.execute(sql)
            return cursor
    
    db = DB()
    

    现在,无论何时您使用 db.query("example SQL") 请求发送查询,它都会自动准备好遇到连接错误,并 self.connect() 在需要时重新连接。

    请记住: 这是一个简化的示例。通常,您会希望让 PyMySQL 帮助您转义查询中的特殊字符。为此,您必须在方法中添加第二个参数 query 并从那里开始。

  • 引用 10

    您永远都不应该使用 \'except:\',甚至不应该限制为 Exception。在这种情况下,它至少应该是 pymssql.StandardError。通常,您不想捕获 SystemExit 异常 sys.exit(),例如,为了防止进程关闭,因为继续执行该异常将使您的进程处于非常不稳定的状态。

  • 逻辑很简单,如果连接关闭则尝试重新连接几次,在这种情况下我使用最大尝试 15 次来重新连接或 ping。

    import pymysql, pymysql.cursors
    conn = pymysql.connect(
                             host=hostname,
                             user=username,
                             password=password,
                             db=dbname,
                             charset='utf8mb4',
                             cursorclass=pymysql.cursors.DictCursor,
                             )
    cursor = conn.cursor()
    # you can do transactions to database and when you need conn later, just make sure the server is still connected
    if conn.open is False:
       max_try = 15
       try = 0
       while conn.open is False:
           if try < max_try:
               conn.ping() # autoreconnect is true by default
           try +=1
    
    # check the conn again to make sure it connected
    if conn.open:
        # statements when conn is successfully reconnect to the server
    else:
        # it must be something wrong : server, network etc
    
  • conn.open 不在 PEP 249 中,它似乎检查是否存在套接字对象(在 SSL 上下文中)。该检查可能无法保证当远程服务器消失时,如果底层套接字选择/轮询时连接无法正常工作,conn.open 将返回 false。

  • 很老了,但我在程序中访问托管数据库时遇到了类似的问题。我最终使用的解决方案是创建一个装饰器,在进行查询时自动重新连接。

    给定一个连接函数:

    def connect(self):
        self.conn = mysql.connector.connect(host=self.host, user=self.user, 
        database=self.database, password=self.password)
        self.cursor = self.conn.cursor()
        print("Established connectionn...")
    

    我创建了

    def _reconnect(func):
        @wraps(func)
        def rec(self,*args,**kwargs):
            try:
                result = func(self,*args,**kwargs)
                return result
            except (mysql.connector.Error, mysql.connector.Warning) as e:
                self.connect()
                result = func(self,*args,**kwargs)
                return result
        return rec 
    

    这样任何使用连接的函数现在都可以被修饰成这样

    @_reconnect
    def check_user_exists(self,user_id):
        self.cursor.execute("SELECT COUNT(*) FROM _ where user_id={};".format(user_id))
        if self.cursor.fetchall()[0][0]==0:
            return False 
        else:
            return True
    

    该装饰器将重新建立连接并重新运行涉及数据库查询的任何功能。

  • 您可以使用属性来使连接在每次查询时保持活动状态:

    import pymysql
    import pymysql.cursors
    import pandas as pd 
    
    class DB:
        def __init__(self, hostname='1.1.1.1', username='root', password='password',
                     database=None, port=3306, charset="utf8mb4"):
            self.hostname = hostname
            self.database = database
            self.username = username 
            self.password = password
            self.port = port
            self.charset = charset
            self.connect()
        
        @property
        def conn(self): 
            if not self.connection.open:
                print('Going to reconnect')
            self.connection.ping(reconnect=True)
            return self.connection
    
        def connect(self):
            self.connection = pymysql.connect(
                                 host=self.hostname,
                                 user=self.username,
                                 password=self.password,
                                 db=self.database,
                                 charset=self.charset,
                                 cursorclass=pymysql.cursors.DictCursor,
                                 port=self.port)
    
        def query(self, sql):
            return pd.read_sql_query(sql, con=self.conn)
    
    db = DB(hostname='1.1.1.1', username='root', password='password', database=None, port=3306, charset="utf8mb4")
    
返回
作者最近主题: