Python连接Redis连接池

记录一下怎么用python连接Redis连接池。

安装依赖

1
pip install redis-py-cluster

Demo

直接上demo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import time
import redis
from redis.lock import Lock

class Redis:
def __init__(self, config):
self.expire = 1800,
self.redis_pool = redis.ConnectionPool(
host=config.REDIS_HOST,
port=config.REDIS_PORT,
password=config.REDIS_PASSWORD,
db=config.REDIS_DB,
decode_responses=config.REDIS_DECODE_RESPONSES,
max_connections=config.REDIS_MAX_CONNECTIONS)

# 从连接池中获取连接,获取不到就一直等待
def __get_connect(self):
while (1):
try:
connect = redis.Redis(connection_pool=self.redis_pool)
return connect
except:
time.sleep(5)

def get(self, key: str) -> str:
with self.__get_connect() as redis:
return redis.get(key)

def set(self, key, value, expire=None):
if expire is None:
expire = self.expire
with self.__get_connect() as redis:
redis.setex(key, expire, value)

def get_list(self, key) -> list:
with self.__get_connect() as redis:
return redis.lrange(key, 0, -1)

# 加锁,拿不到锁就一直等待
def get_lock(self, key="common_lock", timeout=5):
with self.__get_connect() as redis:
lock = redis.lock(key, timeout=timeout)
locked = lock.acquire(blocking=True, blocking_timeout=1)
if locked:
print("获取锁" + key + "成功。")
return lock
else:
print("获取锁" + key + "失败。")
time.sleep(5)
return self.get_lock(key, timeout)

def release_lock(self, lock: Lock):
try:
lock.release()
print("释放锁成功。")
except Exception as e:
print(e)
print("释放锁失败,可能已经被释放。")

def append(self, key: str, value: str):
with self.__get_connect() as redis:
redis.rpush(key, value)

def pop(self, key: str):
with self.__get_connect() as redis:
return redis.lpop(key)

def lenth(self, key: str):
with self.__get_connect() as redis:
redis.set(key,"空")
redis.delete()

def delete(self, key: str):
with self.__get_connect() as redis:
return redis.llen(key)