1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| import time from redis.lock import Lock from rediscluster import RedisCluster, ClusterConnectionPool
class RedisCluster: def __init__(self, config): self.expire = 1800, self.redis_pool = ClusterConnectionPool( startup_nodes=config.REDIS_CLUSTER_NODES, password=config.REDIS_CLUSTER_PASSWORD, decode_responses=config.REDIS_DECODE_RESPONSES, encodings=config.REDIS_CLUSTER_ENCODE, max_connections=config.REDIS_CLUSTER_MAX_CONNECTIONS)
# 从连接池中获取连接,获取不到就一直等待 def __get_connect(self): while (1): try: connect = RedisCluster(connection_pool=self.redis_pool) return connect except: time.sleep(5)
def get(self, key: str) -> str: with self.__get_connect() as redis: return redis.get(key)
def set(self, key, value, expire=None): if expire is None: expire = self.expire with self.__get_connect() as redis: redis.setex(key, expire, value)
def get_list(self, key) -> list: with self.__get_connect() as redis: return redis.lrange(key, 0, -1)
# 加锁,拿不到锁就一直等待 def get_lock(self, key="common_lock", timeout=5): with self.__get_connect() as redis: lock = redis.lock(key, timeout=timeout) locked = lock.acquire(blocking=True, blocking_timeout=1) if locked: print("获取锁" + key + "成功。") return lock else: print("获取锁" + key + "失败。") time.sleep(5) return self.get_lock(key, timeout)
def release_lock(self, lock: Lock): try: lock.release() print("释放锁成功。") except Exception as e: print(e) print("释放锁失败,可能已经被释放。")
def append(self, key: str, value: str): with self.__get_connect() as redis: redis.rpush(key, value)
def pop(self, key: str): with self.__get_connect() as redis: return redis.lpop(key)
def lenth(self, key: str): with self.__get_connect() as redis: redis.set(key, "空") redis.delete()
def delete(self, key: str): with self.__get_connect() as redis: return redis.llen(key)
|