当前位置 博文首页 > qq262593421的博客:Python操作MongoDB的工具类

    qq262593421的博客:Python操作MongoDB的工具类

    作者:[db:作者] 时间:2021-08-17 21:46

    目录

    ?

    一、环境准备

    1、python3安装

    2、MongoDB4安装

    二、安装驱动

    1、pip在线安装pymogo驱动

    2、pymogo驱动测试

    三、py工具类

    1、工具脚本

    2、返回结果

    3、查询结果


    一、环境准备

    1、python3安装

    windows离线安装python3.6.8环境:https://blog.csdn.net/qq262593421/article/details/111309116

    2、MongoDB4安装

    Linux 离线安装 python 3.6.10:https://blog.csdn.net/qq262593421/article/details/105395101?

    win10安装MongoDB4.4:https://blog.csdn.net/qq262593421/article/details/114277083

    二、安装驱动

    1、pip在线安装pymogo驱动

    pip install pymongo

    2、pymogo驱动测试

    python
    import pymongo

    三、py工具类

    1、工具脚本

    #-*- encoding: utf-8 -*-
    
    import pymongo
    
    class MongoDBUtil:
        """
        MongoDB工具类
        """
        def __init__(self, ip="127.0.0.1", db_name=None, port="27017"):
            """构造函数"""
            self.client = pymongo.MongoClient("mongodb://" + ip + ":" + port)
            self.database = self.client[db_name]
    
        def __del__(self):
            """析构函数"""
            # print("__del__")
            self.client.close()
    
        def create_database(self, db_name):
            """创建数据库"""
            return self.client.get_database(db_name)
    
        def drop_database(self, db_name):
            """删除数据库"""
            return self.client.drop_database(db_name)
    
        def select_database(self, db_name):
            """使用数据库"""
            self.database = self.client[db_name]
            return self.database
    
        def get_database(self, db_name):
            """使用数据库"""
            # return self.client[db_name]
            return self.client.get_database(db_name)
    
        def list_database_names(self):
            """获取所有数据库列表"""
            return self.client.list_database_names()
    
        def create_collection(self, collect_name):
            """创建集合"""
            collect = self.database.get_collection(collect_name)
            if(collect is not None):
                print("collection %s already exists" % collect_name)
                return collect
            return self.database.create_collection(collect_name)
    
        def drop_collection(self, collect_name):
            """获取所有集合名称"""
            return self.database.drop_collection(collect_name)
    
        def get_collection(self, collect_name):
            """获取集合"""
            return self.database.get_collection(collect_name)
    
        def list_collection_names(self):
            """获取所有集合名称"""
            return self.database.list_collection_names()
    
        def insert(self, collect_name, documents):
            """插入单条或多条数据"""
            return self.database.get_collection(collect_name).insert(documents)
    
        def insert_one(self, collect_name, document):
            """插入一条数据"""
            return self.database.get_collection(collect_name).insert_one(document)
    
        def insert_many(self, collect_name, documents):
            """插入多条数据"""
            return self.database.get_collection(collect_name).insert_many(documents)
    
        def delete_one(self, collect_name, filter, collation=None, hint=None, session=None):
            """删除一条记录"""
            return self.database.get_collection(collect_name).delete_one(filter, collation, hint, session)
    
        def delete_many(self, collect_name, filter, collation=None, hint=None, session=None):
            """删除所有记录"""
            return self.database.get_collection(collect_name).delete_many(filter, collation, hint, session)
    
        def find_one_and_delete(self, collect_name, filter, projection=None, sort=None, hint=None, session=None, **kwargs):
            """查询并删除一条记录"""
            return self.database.get_collection(collect_name).find_one_and_delete(filter, projection, sort, hint, session, **kwargs)
    
        def count_documents(self, collect_name, filter, session=None, **kwargs):
            """查询文档数目"""
            return self.database.get_collection(collect_name).count_documents(filter, session, **kwargs)
    
        def find_one(self, collect_name, filter=None, *args, **kwargs):
            """查询一条记录"""
            return self.database.get_collection(collect_name).find_one(filter, *args, **kwargs)
    
        def find(self, collect_name, *args, **kwargs):
            """查询所有记录"""
            return self.database.get_collection(collect_name).find(*args, **kwargs)
    
        def update(self, collect_name, spec, document, upsert=False, manipulate=False,
                   multi=False, check_keys=True, **kwargs):
            """更新所有记录"""
            return self.database.get_collection(collect_name).update(spec, document,
                                    upsert, manipulate, multi, check_keys, **kwargs)
    
        def update_one(self, collect_name, filter, update, upsert=False, bypass_document_validation=False,
                                    collation=None, array_filters=None, hint=None, session=None):
            """更新一条记录"""
            return self.database.get_collection(collect_name).update_one(filter, update,
                                    upsert, bypass_document_validation, collation, array_filters, hint, session)
    
        def update_many(self, collect_name, filter, update, upsert=False, array_filters=None,
                                    bypass_document_validation=False, collation=None, hint=None, session=None):
            """更新所有记录"""
            return self.database.get_collection(collect_name).update_many(filter, update,
                                    upsert, array_filters, bypass_document_validation, collation, hint, session)
    
        def find_one_and_update(self, collect_name, filter, update, projection=None, sort=None, upsert=False,
                               return_document=False, array_filters=None, hint=None, session=None, **kwargs):
            """查询并更新一条记录"""
            return self.database.get_collection(collect_name).find_one_and_update(filter, update, projection,
                                    sort, upsert, return_document, array_filters, hint, session, **kwargs)
    
    if __name__ == "__main__":
        print("------------------start-------------------------")
        mongoUtil = MongoDBUtil(ip="127.0.0.1", db_name="xl01", port="27017")
        """数据库操作"""
        stat = mongoUtil.create_database(db_name="xl01")
        # stat = mongoUtil.drop_database(db_name="xl01")
        stat = mongoUtil.list_database_names()
        stat = mongoUtil.get_database(db_name="xl01")
        """集合操作"""
        stat = mongoUtil.create_collection(collect_name="xl_collect_01")
        # stat = mongoUtil.drop_collection(collect_name="xl_collect_01")
        stat = mongoUtil.get_collection(collect_name="xl_collect_01")
        stat = mongoUtil.list_collection_names()
        """文档操作:增加"""
        document = {"name": "hao123", "type": "搜索引擎", "url": "http://www.hao123.com/"}
        stat = mongoUtil.insert_one(collect_name="xl_collect_01", document=document)
        # documents = [{'x': i} for i in range(2)]
        documents = [{"name": "hao123", "type": "搜索引擎"} for i in range(2)]
        # stat = mongoUtil.insert(collect_name="xl_collect_01", documents=documents)
        stat = mongoUtil.insert_many(collect_name="xl_collect_01", documents=documents)
        """文档操作:查询"""
        stat = mongoUtil.find_one(collect_name="xl_collect_01")
        print(type(stat), stat)
        rows = mongoUtil.find(collect_name="xl_collect_01")
        # for row in rows:
        #     print(row)
        filter = {'name': 'hao123'}
        # filter = {'x': 1}
        count = mongoUtil.count_documents(collect_name="xl_collect_01", filter=filter)
        print(type(stat), count)
        """文档操作:删除"""
        stat = mongoUtil.delete_one(collect_name="xl_collect_01", filter=filter)
        stat = mongoUtil.find_one_and_delete(collect_name="xl_collect_01", filter=filter)
        # stat = mongoUtil.delete_many(collect_name="xl_collect_01", filter=filter)
        print(type(stat), stat)
        """文档操作:修改"""
        spec = {"url": "http://www.baidu.com/"}
        # spec = {"url": "http://www.hao123.com/"}
        stat = mongoUtil.update(collect_name="xl_collect_01", spec=spec, document=document)
        print(type(stat), stat)
        update = {"$set": spec}
        stat = mongoUtil.update_one(collect_name="xl_collect_01", filter=filter, update=update)
        print(type(stat), stat.modified_count, stat)
        # stat = mongoUtil.update_many(collect_name="xl_collect_01", filter=filter, update=update)
        # print(type(stat), stat.modified_count, stat)
        stat = mongoUtil.find_one_and_update(collect_name="xl_collect_01", filter=filter, update=update)
        print(type(stat), stat)
        print("-------------------end--------------------------")

    2、返回结果

    ------------------start-------------------------
    collection xl_collect_01 already exists
    <class 'dict'> {'_id': ObjectId('6040dcac61d7850016fd1de7'), 'name': 'hao123', 'type': '搜索引擎', 'url': 'http://www.baidu.com/'}
    <class 'dict'> 6
    <class 'dict'> {'_id': ObjectId('6040dcac61d7850016fd1de8'), 'name': 'hao123', 'type': '搜索引擎'}
    <class 'dict'> {'n': 0, 'nModified': 0, 'ok': 1.0, 'updatedExisting': False}
    <class 'pymongo.results.UpdateResult'> 1 <pymongo.results.UpdateResult object at 0x0000020B40EF7E88>
    <class 'dict'> {'_id': ObjectId('6040dcac61d7850016fd1de9'), 'name': 'hao123', 'type': '搜索引擎', 'url': 'http://www.baidu.com/'}
    -------------------end--------------------------

    3、查询结果

    cs