当前位置 博文首页 > qq262593421的博客:Python3实现MySQL数据增量更新同步到MongoDB

    qq262593421的博客:Python3实现MySQL数据增量更新同步到MongoDB

    作者:[db:作者] 时间:2021-08-17 21:45

    目录

    一、MySQL工具类

    二、MongoDB工具类

    三、数据同步实现代码


    一、MySQL工具类

    # -*- encoding: utf-8 -*-
    
    import pymysql
    
    class MySQLUtil:
        """
        MySQL工具类
        """
        def __init__(self, host="127.0.0.1", user=None, passwd=None, db=None, charset="utf8", *args, **kwargs):
            """构造函数"""
            self.host = host
            self.user = user
            self.passwd = passwd
            self.db = db
            self.conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db, charset=charset, *args, **kwargs)
    
        def __del__(self):
            """析构函数"""
            self.conn.close()
    
        def get_cursor(self):
            """使用游标"""
            return self.conn.cursor()
    
        def select_db(self, db):
            """选择数据库"""
            self.conn.select_db(db)
    
        def list_databases(self):
            """查询所有数据库"""
            cursor = self.conn.cursor()
            cursor.execute("SHOW DATABASES")
            return cursor.fetchall()
    
        def list_tables(self):
            """查询所有表"""
            cursor = self.conn.cursor()
            cursor.execute("SHOW TABLES")
            return cursor.fetchall()
    
        def execute(self, sql, args=None):
            """执行SQL"""
            cursor = self.conn.cursor()
            cursor.execute(sql, args)
            return cursor.fetchall()
    
        def get_version(self):
            """打印MySQL版本"""
            cursor = self.conn.cursor()
            cursor.execute("SELECT VERSION()")
            version = cursor.fetchone()
            print("MySQL Version : %s" % version)
            return version
    
        def list_table_metadata(self):
            """查询所有表的元数据信息"""
            sql = "SELECT * FROM information_schema.TABLES WHERE TABLE_TYPE !='SYSTEM VIEW' AND TABLE_SCHEMA NOT IN ('sys','mysql','information_schema','performance_schema')"
            cursor = self.conn.cursor()
            cursor.execute(sql)
            return cursor.fetchall()
    
        def get_table_fields(self, db, table, args=None):
            """获取表字段信息"""
            sql = 'SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE table_schema="'+db+'" AND table_name="'+table+'"'
            cursor = self.conn.cursor()
            cursor.execute(sql, args)
            fields = []
            for field in cursor.fetchall():
                fields.append(field[0])
            return fields
    
        def table_metadata(self, db, table, args=None):
            """查询表字段的元数据信息"""
            db = "'" + db + "'"
            table = "'" + table + "'"
            """执行SQL"""
            sql = """
            SELECT 
                column_name,column_type,ordinal_position,column_comment,column_default 
            FROM 
                information_schema.COLUMNS 
            WHERE 
                table_schema = %s AND table_name = %s;
            """ % (db, table)
            cursor = self.conn.cursor()
            cursor.execute(sql, args)
            return cursor.fetchall()
    
    
    if __name__ == "__main__":
        mysqlUtil = MySQLUtil("127.0.0.1", "root", "123456", "test")
        mysqlUtil = MySQLUtil(host="127.0.0.1", user="root", passwd="123456", db="test")
        mysqlUtil.get_version()
        conn = mysqlUtil.conn
        mysqlUtil.select_db("test")
        print(type(conn.db), conn.db)
        databases = mysqlUtil.list_databases()
        print(type(databases), databases)
        tables = mysqlUtil.list_tables()
        print(type(tables), tables)
        sql = "SELECT * FROM t_user"
        result = mysqlUtil.execute(sql)
        for i in result:
            print(i)
        result = mysqlUtil.table_metadata("test", "t_user")
        result = mysqlUtil.get_table_fields("test", "t_user")
        for i in result:
            print(i)

    二、MongoDB工具类

    #-*- encoding: utf-8 -*-
    
    import pymongo
    
    class MongoDBUtil:
        """
        MongoDB工具类
        """
        def __init__(self, ip="127.0.0.1", db_name=None, port="27017"):
            """构造函数"""
            self.client = pymongo.MongoClient("mongodb://" + ip + ":" + port)
            self.database = self.client[db_name]
    
        def __del__(self):
            """析构函数"""
            # print("__del__")
            self.client.close()
    
        def create_database(self, db_name):
            """创建数据库"""
            return self.client.get_database(db_name)
    
        def drop_database(self, db_name):
            """删除数据库"""
            return self.client.drop_database(db_name)
    
        def select_database(self, db_name):
            """使用数据库"""
            self.database = self.client[db_name]
            return self.database
    
        def get_database(self, db_name):
            """使用数据库"""
            # return self.client[db_name]
            return self.client.get_database(db_name)
    
        def list_database_names(self):
            """获取所有数据库列表"""
            return self.client.list_database_names()
    
        def create_collection(self, collect_name):
            """创建集合"""
            collect = self.database.get_collection(collect_name)
            if(collect is not None):
                print("collection %s already exists" % collect_name)
                return collect
            return self.database.create_collection(collect_name)
    
        def drop_collection(self, collect_name):
            """获取所有集合名称"""
            return self.database.drop_collection(collect_name)
    
        def get_collection(self, collect_name):
            """获取集合"""
            return self.database.get_collection(collect_name)
    
        def list_collection_names(self):
            """获取所有集合名称"""
            return self.database.list_collection_names()
    
        def insert(self, collect_name, documents):
            """插入单条或多条数据"""
            return self.database.get_collection(collect_name).insert(documents)
    
        def insert_one(self, collect_name, document):
            """插入一条数据"""
            return self.database.get_collection(collect_name).insert_one(document)
    
        def insert_many(self, collect_name, documents):
            """插入多条数据"""
            return self.database.get_collection(collect_name).insert_many(documents)
    
        def delete_one(self, collect_name, filter, collation=None, hint=None, session=None):
            """删除一条记录"""
            return self.database.get_collection(collect_name).delete_one(filter, collation, hint, session)
    
        def delete_many(self, collect_name, filter, collation=None, hint=None, session=None):
            """删除所有记录"""
            return self.database.get_collection(collect_name).delete_many(filter, collation, hint, session)
    
        def find_one_and_delete(self, collect_name, filter, projection=None, sort=None, hint=None, session=None, **kwargs):
            """查询并删除一条记录"""
            return self.database.get_collection(collect_name).find_one_and_delete(filter, projection, sort, hint, session, **kwargs)
    
        def count_documents(self, collect_name, filter, session=None, **kwargs):
            """查询文档数目"""
            return self.database.get_collection(collect_name).count_documents(filter, session, **kwargs)
    
        def find_one(self, collect_name, filter=None, *args, **kwargs):
            """查询一条记录"""
            return self.database.get_collection(collect_name).find_one(filter, *args, **kwargs)
    
        def find(self, collect_name, *args, **kwargs):
            """查询所有记录"""
            return self.database.get_collection(collect_name).find(*args, **kwargs)
    
        def update(self, collect_name, spec, document, upsert=False, manipulate=False,
                   multi=False, check_keys=True, **kwargs):
            """更新所有记录"""
            return self.database.get_collection(collect_name).update(spec, document,
                                    upsert, manipulate, multi, check_keys, **kwargs)
    
        def update_one(self, collect_name, filter, update, upsert=False, bypass_document_validation=False,
                                    collation=None, array_filters=None, hint=None, session=None):
            """更新一条记录"""
            return self.database.get_collection(collect_name).update_one(filter, update,
                                    upsert, bypass_document_validation, collation, array_filters, hint, session)
    
        def update_many(self, collect_name, filter, update, upsert=False, array_filters=None,
                                    bypass_document_validation=False, collation=None, hint=None, session=None):
            """更新所有记录"""
            return self.database.get_collection(collect_name).update_many(filter, update,
                                    upsert, array_filters, bypass_document_validation, collation, hint, session)
    
        def find_one_and_update(self, collect_name, filter, update, projection=None, sort=None, upsert=False,
                               return_document=False, array_filters=None, hint=None, session=None, **kwargs):
            """查询并更新一条记录"""
            return self.database.get_collection(collect_name).find_one_and_update(filter, update, projection,
                                    sort, upsert, return_document, array_filters, hint, session, **kwargs)
    
    if __name__ == "__main__":
        print("------------------start-------------------------")
        # mongoUtil = MongoDBUtil(ip="192.168.81.165", port="27017")
        mongoUtil = MongoDBUtil(ip="127.0.0.1", db_name="xl01", port="27017")
        """数据库操作"""
        stat = mongoUtil.create_database(db_name="xl01")
        # stat = mongoUtil.drop_database(db_name="xl01")
        stat = mongoUtil.list_database_names()
        stat = mongoUtil.get_database(db_name="xl01")
        """集合操作"""
        stat = mongoUtil.create_collection(collect_name="xl_collect_01")
        # stat = mongoUtil.drop_collection(collect_name="xl_collect_01")
        stat = mongoUtil.get_collection(collect_name="xl_collect_01")
        stat = mongoUtil.list_collection_names()
        """文档操作:增加"""
        document = {"name": "hao123", "type": "搜索引擎", "url": "http://www.hao123.com/"}
        stat = mongoUtil.insert_one(collect_name="xl_collect_01", document=document)
        # documents = [{'x': i} for i in range(2)]
        documents = [{"name": "hao123", "type": "搜索引擎"} for i in range(2)]
        # stat = mongoUtil.insert(collect_name="xl_collect_01", documents=documents)
        stat = mongoUtil.insert_many(collect_name="xl_collect_01", documents=documents)
        """文档操作:查询"""
        stat = mongoUtil.find_one(collect_name="xl_collect_01")
        print(type(stat), stat)
        rows = mongoUtil.find(collect_name="xl_collect_01")
        # for row in rows:
        #     print(row)
        filter = {'name': 'hao123'}
        # filter = {'x': 1}
        count = mongoUtil.count_documents(collect_name="xl_collect_01", filter=filter)
        print(type(stat), count)
        """文档操作:删除"""
        stat = mongoUtil.delete_one(collect_name="xl_collect_01", filter=filter)
        stat = mongoUtil.find_one_and_delete(collect_name="xl_collect_01", filter=filter)
        # stat = mongoUtil.delete_many(collect_name="xl_collect_01", filter=filter)
        print(type(stat), stat)
        """文档操作:修改"""
        spec = {"url": "http://www.baidu.com/"}
        # spec = {"url": "http://www.hao123.com/"}
        stat = mongoUtil.update(collect_name="xl_collect_01", spec=spec, document=document)
        print(type(stat), stat)
        update = {"$set": spec}
        stat = mongoUtil.update_one(collect_name="xl_collect_01", filter=filter, update=update)
        print(type(stat), stat.modified_count, stat)
        # stat = mongoUtil.update_many(collect_name="xl_collect_01", filter=filter, update=update)
        # print(type(stat), stat.modified_count, stat)
        stat = mongoUtil.find_one_and_update(collect_name="xl_collect_01", filter=filter, update=update)
        print(type(stat), stat)
        print("-------------------end--------------------------")

    三、数据同步实现代码

    #-*- encoding: utf-8 -*-
    
    import sys, uuid
    sys.path.append(r'..')
    from MongoDB.MongoDBUtil import MongoDBUtil
    from MySQL.MySQLUtil import MySQLUtil
    
    class SyncMysqlMongo:
        """
        mysql同步数据到MongoDB
        """
        def __init__(self, mysql_ip, mysql_user, mysql_passwd, mysql_db, mongo_ip, mongo_db):
            self.mysqlUtil = MySQLUtil(mysql_ip, mysql_user, mysql_passwd, mysql_db)
            self.mongoUtil = MongoDBUtil(mongo_ip, mongo_db)
    
        def mysqlToMongo(self, mysql_database, mysql_table, mongo_collect_name):
            """将一张MySQL表数据全量插入到MongoDB集合中"""
            """
            1、从mysql查询指定表的字段信息和表数据
            2、遍历表数据的同时,通过表字段构造字典并插入列表
            3、批量插入或遍历列表循环插入MongoDB
            """
            table_field = self.mysqlUtil.get_table_fields(mysql_database, mysql_table)
            table_data = self.mysqlUtil.execute("SELECT * FROM " + mysql_table)   ## 查询表所有数据
            ## 打印字段信息
            for field in table_field:
                print(field[0], end=" ")
            print()
            documents = []
            ## 遍历MySQL查询数据,将每行数据写入list,list的每个元素为用字典表示,字典存储字段和值
            for row in table_data:
                dict = {}
                for key, value in zip(table_field, row):
                    dict[key[0]] = value
                documents.append(dict)
            ## 批量插入MongoDB
            # self.mongoUtil.insert_many(mongo_collect_name, documents)
            # print(documents)
            ## 循环插入MongoDB
            for document in documents:
                print(document)
                self.mongoUtil.insert_one(mongo_collect_name, document)
    
    if __name__ == "__main__":
        mysql_user, mysql_passwd = "root", "123456"
        mysql_ip, mysql_db = "127.0.0.1", "test"
        mongo_ip, mongo_db = "127.0.0.1", "xl01"
        ## 数据库同步对象
        syncsql = SyncMysqlMongo(mysql_ip, mysql_user, mysql_passwd, mysql_db, mongo_ip, mongo_db)
        ## 将一张MySQL表数据全量插入到MongoDB集合中
        mysql_database, mysql_table = "test", "t_user"
        mongo_collect_name = "t_user"
        syncsql.mysqlToMongo(mysql_database, mysql_table, mongo_collect_name)
    

    cs