今天小编从项目的实际出发,由于项目某一个表的数据达到好几十万条,此时数据的增删查改会很慢;为了增加提高访问的速度,我们引入动态创建表。
代码如下:
from app_factory import app from sqlalchemy import Column, String, Integer class ProjectModel(app.db.model, app.db.Mixin): tablename = 'Project_' ID = Column(String(50), name='ID', doc='id') PROJECTNUMBER = Column(String(100), name='PROJECTNUMBER', doc='项目编号') ...... @staticmethod def create_table(project_number) table_name = ProjectModel.tablename + projectnumber structs = [ {'fieldname': 'id', 'type': 'varchar2(50)', 'primary': True, 'default': ''}, {'fieldname': 'PROJECTNUMBER', 'type': 'varchar2(50)', 'default': 0, 'isnull': True}, ....... ] app.db.create_table(table_name, structs)
那么,内层函数是如何创建的呢?其实就是拼接sql语句create table ....
代码如下:
class SQLAlchemyDB(SQLAlchemy): def __init__(self, app) super(SQLAlchemyDB, self).__init__(app) self.__app = app self.engine.echo = False self.conn = self.engine.connect() self.Model.to_dict() = lambda self:{c.name:getattr(self, c.name, None) for c in self.__table__.columns} self.Session = sessionmaker(bind=self.engine) self.ScopedSession = lambda: scoped_session(self.Session) # 释放碎片空间 def free_idle_space(self): return self.execute('purge recyclebin') def connstatus(self): return self.engine.pool.status() def close(self): self.conn.close() self.engine.dispose() # 非返回数据的记录语句 def execute(self, sqlexpr) try: ret = self.conn.execute(sqlalchemy.text(sqlexpr)) except Exception as err: return False, str(err) except sqlalchemy.exc.InvalidRequestError as err: return False, str(err) return True, '' # 动态拼接sql语句, 创建表 def create_table(self, tablename, structs): fieldinfos = [] for struct in structs: defaultvalue = struct.get('default') if defaultvalue : defaultvalue = "'{0}'".format(defaultvalue) if type(defaultvalue) == 'str' else str(defaultvalue) fieldinfos.append('{0} {1} {2} {3} {4}'.format(struct['fieldname'], struct['type'], 'primary key' if struct.get('primary') else '', ('default' + defaultvalue) if defaultvalue else '', '' if struct.get('isnull') else 'not null')) sql = 'create table {0} ({1})'.format(tablename, ','.join(fieldinfos)) ret, err = self.execute(sql) if ret: self.__app.sync_record(tablename, 'sql_createtable', {}, sql) return ret, err # 动态判断表是否存在 def existtable(self, tablename): ret, err = self.GetRecordCount("user_all_table", "TABLE_NAME='" + tablename +"'") return ret>0, err def GetRecordCount(self, tablename, where= None): sql = 'select count(*) as num from {0} {1}'.format(tablename,('where' + where)) if where != None else '') recs, err = self.query(sql) if recs: for rec in recs: return rec['num'], '' return -1 , err # 查询数据记录 def query(self, sqlexpr): try: recs = self.conn.execute(sqlalchemy.text(sqlexpr) return recs, '' expect Exception as err: return None, str(err) expect sqlalchemy.exc.InvalidRequestError as err: return None, str(err)