'redis', 'mongo', 'elasticsearch', 'mysql', 'sqlachemy', '线程安全的数据库封装,享元模式支持无限实例化调用'

db-libs的Python项目详细描述


db_libs

各种数据库的封装。只封装最难的部分,十分克制,很少去添加一些新的方法然后去调用原生类的方法。

因为原生类已经很好用了,主要是控制一下用户无限实例化连接类,造成反复创建连接就可以了。

redis_lib和mongo_lib在redis和pymongo的基础上进行封装。
由于原生的包已经足够好用了,并不需要重新写几百个方法进行过度封装。
仅仅是加了享元模式,使得在相同入参情况下无限实例化相关客户端类的时候不会反复重新连接。
封装方式采用的非常规方式,使用的是继承方式,而非通常情况下的使用组合来进行封装。
继承方式封装的比组合模式封装的好处更多。



mysql_lib使用连接池,兼容在多线程环境运行。使调用时候少关注cursor commit close 等。

sqla_lib 是反射已存在的表,也是使用连接池,兼容多线程环境运行。能够支持orm和原生sql语句两种执行方式。

# 组合模式封装的代码一般是如下这种例子。"""这种方式封装是组合,精确点是23种设计模式的代理模式。代理模式说的是定义很多方法,来调用self.r所具有的方法。另外对于无限实例化,还使用了享元模式。封装数据库切记不要使用单例模式,如果入参传了不同的主机ip或者不同的db,而仍然返回之前的连接对象,那就大错特错了。"""# 使用组合模式封装的redis,没有继承模式的好用。importredisclassRedisClient:params__reids_map={}def__init__(self,host,db,port,password):if(host,db,port,password)notinself.__class__.params__reids_map:self.r=redis.Redis(host,db,port,password)self.__class__.params__reids_map[(host,db,port,password)]=self.relse:self.r=self.__class__.params__reids_map[(host,db,port,password)]defmy_set(self,key,value):print('额外的扩展')self.r.set(name=key,value=value)defmy_get(self,name):# 这个封装简直是多次一举,在redis实例化时候,# 将Redis类的构造方法的入参 decode_responses设置为True,就可以避免几百个方法需要反复decode了。returnself.r.get(name).decode()

例如网上的一种封装,重新封装几百个方法,我不喜欢这样封装工具类。

importredisclassMyRedis():def__init__(self,ip,password,port=6379,db=1):#构造函数try:self.r=redis.Redis(host=ip,password=password,port=port,db=db)#连接redis固定方法,这里的值必须固定写死exceptExceptionase:print('redis连接失败,错误信息%s'%e)defstr_get(self,k):res=self.r.get(k)#会从服务器传对应的值过来,性能慢ifres:returnres.decode()#从redis里面拿到的是bytes类型的数据,需要转换一下defstr_set(self,k,v,time=None):#time默认失效时间self.r.set(k,v,time)defdelete(self,k):tag=self.r.exists(k)#判断这个key是否存在,相对于get到这个key他只是传回一个存在火灾不存在的信息,# 而不用将整个k值传过来(如果k里面存的东西比较多,那么传输很耗时)iftag:self.r.delete(k)else:print('这个key不存在')defhash_get(self,name,k):#哈希类型存储的是多层字典(嵌套字典)res=self.r.hget(name,k)ifres:returnres.decode()#因为get不到值得话也不会报错所以需要判断一下defhash_set(self,name,k,v):#哈希类型的是多层self.r.hset(name,k,v)#set也不会报错defhash_getall(self,name):res=self.r.hgetall(name)#得到的是字典类型的,里面的k,v都是bytes类型的data={}ifres:fork,vinres.items():#循环取出字典里面的k,v,在进行decodek=k.decode()v=v.decode()data[k]=vreturndatadefhash_del(self,name,k):res=self.r.hdel(name,k)ifres:print('删除成功')return1else:print('删除失败,该key不存在')return0@property#属性方法,# 使用的时候和变量一个用法就好比实例,A=MyRedis(), A.clean_redis使用,# 如果不加这个@property,使用时A=MyRedis(), A.clean_redis()   后面需要加这个函数的括号defclean_redis(self):self.r.flushdb()#清空 redisprint('清空redis成功!')return0a=MyRedis('118.0000','HK0000*')print(a.str_get('duan'))

sqla_lib使用如下,可以orm操作已存在的数据库。

if__name__=='__main__':"""    例如 ihome_area2的表结果如下。    create table ihome_area2(    create_time datetime    null,    update_time datetime    null,    id          int auto_increment        primary key,    name        varchar(32) not null);    """enginex=create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/aj?charset=utf8',max_overflow=10,# 超过连接池大小外最多创建的连接pool_size=50,# 连接池大小pool_timeout=30,# 池中没有线程最多等待的时间,否则报错pool_recycle=3600,# 多久之后对线程池中的线程进行一次连接的回收(重置)echo=True)sqla_helper=SqlaReflectHelper(enginex)Ihome_area2=sqla_helper.base_classes.ihome_area2# ihome_area2是表名。deff1():withsqla_helper.sessionasss:ss# type: _SessionContextprint(ss)print(ss.query(sqlalchemy.func.count(Ihome_area2.id)).scalar())# 使用orm方式插入ss.add(Ihome_area2(create_time=datetime.now(),update_time=datetime.now(),name='testname'))print(ss.query(sqlalchemy.func.count(Ihome_area2.id)).scalar())# 使用占位符语法插入,此种可以防止sql注入ss.execute(f'''INSERT INTO ihome_area2 (create_time, update_time, name) VALUES (:v1,:v2,:v3)''',params={'v1':'2020-06-14 19:15:14','v2':'2020-06-14 19:15:14','v3':'testname00'})# 直接自己拼接完整字符串,不使用三方包占位符的后面的参数,此种会引起sql注入,不推荐。cur=ss.execute(f'''INSERT INTO ihome_area2 (create_time, update_time, name) VALUES ('2020-06-14 19:15:14','2020-06-14 19:15:14', 'testname')''',)# 这样也可以打印执行的语句# noinspection PyProtectedMemberprint(cur._saved_cursor._executed)# 使用最原生的语句,直接调用了pymysql的cursor对象。conny=sqla_helper.engine.raw_connection()cury=conny.cursor(DictCursor)# type: DictCursorprint(cury)cury.execute('SELECT * FROM ihome_area2 LIMIT 3')result=cury.fetchall()print(result)conny.commit()cury.close()conny.close()deff2():ss=sqla_helper.get_session_factory()()print(ss)print(ss.query(sqlalchemy.func.count(sqla_helper.base_classes.ihome_area.id)).scalar())ss.add(sqla_helper.base_classes.ihome_area(create_time=datetime.now(),update_time=datetime.now(),name='testname'))ss.commit()print(ss.query(sqlalchemy.func.count(sqla_helper.base_classes.ihome_area.id)).scalar())ss.close()withdecorator_libs.TimerContextManager():t_pool=BoundedThreadPoolExecutor(10)# 封装mysql,切记一定要测试多线程下的情况。for_inrange(500):# f1()t_pool.submit(f1)t_pool.shutdown()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何从tester类访问/更改ArrayList?   java如何在Android中以编程方式更改菜单标题   spring boot在Kafka Java中为同一主题创建多个消费者组   java JVM字节码表示法,注释语法。调用动力学   java活动在旋转后泄漏了窗口PopupWindow   java允许Nashorn运行用户代码   Java Joda Time实现一个日期范围迭代器   当字符串长度为奇数时使用递归打印字符串的java基本情况   java无法从JSONArray获取JSONObject   java有没有一种方法可以使用单个示例文件进行所有测试?   java My bufferedReader读取整个文件,而不仅仅是一行。为什么?   当已有用户输入时,java变量为null   java如何将正则表达式与阿拉伯语文本一起使用   java Selenium WebDriver“单击”和JavascriptExecutor单击之间有什么区别   java在运行代码时,排序方法会在第二个数组应该按升序或降序排序时打印相同的精确数组号   java如何在HashMap中添加多个具有相同键的相同类型的对象   java有人知道为什么菜单栏在使用系统gtk主题时会显示白色文本吗?   在tomcat中用java方法访问JavaScript文件