亚马逊AWS官方博客

使用 Python 构建 Valkey Cluster 客户端:从零到生产实践指南

1. Valkey 的崛起:开源内存数据库的新篇章

在当今高并发、分布式系统的技术浪潮中,内存数据库已成为架构不可或缺的组件。2024 年 3 月 28 日,开源社区经历了重大变革,孕育出 Valkey——一个充满活力的开源项目。

Valkey 作为新一代分布式缓存解决方案,相比传统 Redis 具有显著优势。在性能方面,Valkey 8.0 可实现单节点 1.2M RPS 的处理能力,同时通过优化的存储机制,在相同数据量下可节省 20% 的存储空间。这不仅提升了系统性能,还能有效降低运营成本。

在架构设计上,Valkey 通过增强单点处理能力,使用户能够使用更少的节点或更小规格的机型来支撑相同规模的应用,显著降低了基础设施成本。特别是在托管服务方面,ElastiCache/MemoryDB for Valkey 7.2 可实现 20%-33% 的成本节约。

Valkey 8.0 在可靠性方面也有重要突破,引入了 RDB 和复制 backlog 的双通道机制,优化了同步速度,并通过独立进程处理减少了主节点负载。同时,改进的 re-sharding 机制增强了节点故障时的容错能力,配合完善的 slot 级别监控指标,为运维管理提供了更好的可观测性。这些优势使 Valkey 成为构建高性能、低成本、高可靠性分布式缓存系统的理想选择。

2. Valkey Glide:AWS 打造的下一代分布式缓存客户端

Valkey Glide 是 AWS 倾力打造的开源 Redis 客户端,承载着云原生时代分布式存储的创新理念。

2.1 多语言支持与版本兼容性

语言生态:原生支持 Java、Python、Node.js、Scala、Kotlin

版本兼容

  • Redis 6.2、7.0、7.2
  • 完全兼容 Valkey 7.2 及以上版本

2.2 核心技术特性

2.2.1 智能拓扑感知

在传统的分布式系统中,客户端在面对集群拓扑变化时常常陷入可用性的困境。Glide 通过创新的大多数规则算法,巧妙地解决了这一痛点。其独特的方法能够高效地发现拓扑变更,有效避免 CLUSTER 命令的频繁风暴,并在毫秒级别快速响应集群拓扑的动态变化,为分布式系统提供了前所未有的敏捷性和稳定性。

2.2.2 读写策略增强

Glide 为不同的业务场景提供了极具灵活性的读写模式。在 PRIMARY 模式下,系统严格遵循强一致性原则,所有的读写操作均通过主节点执行,确保数据的绝对一致性。而在 PREFER_REPLICA 模式中,系统优先从从节点读取数据,在从节点不可用时能够无缝地切换至主节点,实现了读写操作的智能路由和高可用性。

2.2.3 自动订阅恢复

在 Pub/Sub 场景中,Glide 展现出革命性的优化能力。当拓扑变更导致连接中断时,客户端能够自动重新订阅,并透明地将连接重定向至新节点。这一特性极大地简化了应用层的复杂性,使开发者无需手动处理订阅的重建和连接管理,从而显著提升了系统的开发效率和运行稳定性。

2.3 应用场景

Glide 的卓越设计使其成为多种现代分布式系统的理想选择。无论是微服务架构中的服务协调、分布式缓存系统的高性能数据存储、实时消息系统的低延迟通信,还是面临高并发挑战的大规模应用,Glide 都能提供稳定、高效的解决方案。其灵活的架构和智能的设计,使其能够轻松应对不同场景下的复杂需求,成为连接应用与数据存储的理想桥梁。

3. 项目搭建与依赖安装

3.1 Valkey 集群配置

3.1.1 在 AWS Console 中创建 Valkey集群

在登录 AWS Console 后,搜索 Amazon ElastiCache,进入资源-Valkey 缓存界面,点击按钮“创建 Valkey 缓存”。

3.1.2 配置集群参数

在本次 Demo 中,部署选项我们选择“设计自己的缓存”,创建方法选择“集群缓存,集群模式选择“已启用”。然后点击下一步继续高级设置。

高级设置中,为了简化设置,我们将传输中加密,先不打勾。在安全组中选择相关的VPC内的安全组。

安全组 inbound 规则至少包含端口允许 6379,可以根据实际网络环境选择特定的 VPC 或者其他 CIDR 段等方式。本次 Demo 中为了简化操作,填为 0.0.0.0/0,即允许来自任意位置。

然后一直点“下一步”,直至点击“创建”按钮,然后等待集群创建完成。

等待集群创建完成后,我们可以获得“配置终端节点”,即 Valkey 集群的访问入口。这个地址,后续我们会在客户端连接的时候使用。

3.2 Python客户端实现

3.2.1 初始化项目

# 创建项目目录
mkdir valkey-glide-demo
cd valkey-glide-demo
Bash

3.2.2 安装 Glide 依赖

# 安装 Valkey Glide 客户端
pip install valkey-glide
Bash

3.2.3 代码实现:Valkey Cluster 简单客户端

import asyncio
from typing import List, Tuple

from glide import (
    AllNodes,
    ClosingError,
    ConnectionError,
    GlideClusterClient,
    GlideClusterClientConfiguration,
    InfoSection,
    Logger,
    LogLevel,
    NodeAddress,
    RequestError,
    TimeoutError,
)


async def create_client(
    nodes_list: List[Tuple[str, int]] = [("valkey-demo.titbpn.clustercfg.use1.cache.amazonaws.com", 6379)]
) -> GlideClusterClient:
    """
    Creates and returns a GlideClusterClient instance.

    This function initializes a GlideClusterClient with the provided list of nodes.
    The nodes_list may contain the address of one or more cluster nodes, and the
    client will automatically discover all nodes in the cluster.

    Args:
        nodes_list (List[Tuple[str, int]]): A list of tuples where each tuple
            contains a host (str) and port (int). Defaults to [("localhost", 6379)].

    Returns:
        GlideClusterClient: An instance of GlideClusterClient connected to the discovered nodes.
    """
    addresses = [NodeAddress(host, port) for host, port in nodes_list]
    # Check `GlideClusterClientConfiguration` for additional options.
    config = GlideClusterClientConfiguration(
        addresses=addresses,
        client_name="test_cluster_client",
        # Enable this field if the servers are configured with TLS.
        # use_tls=True
    )
    return await GlideClusterClient.create(config)


async def app_logic(client: GlideClusterClient):
    """
    Executes the main logic of the application, performing basic operations
    such as SET, GET, PING, and INFO REPLICATION using the provided GlideClusterClient.

    Args:
        client (GlideClusterClient): An instance of GlideClusterClient.
    """
    # Send SET and GET
    set_response = await client.set("foo", "bar")
    Logger.log(LogLevel.INFO, "app", f"Set response is = {set_response!r}")

    get_response = await client.get("foo")
    assert isinstance(get_response, bytes)
    Logger.log(LogLevel.INFO, "app", f"Get response is = {get_response.decode()!r}")

    # Send PING to all primaries (according to Redis's PING request_policy)
    pong = await client.ping()
    Logger.log(LogLevel.INFO, "app", f"PING response is = {pong!r}")

    # Send INFO REPLICATION with routing option to all nodes
    info_repl_resps = await client.info([InfoSection.REPLICATION], AllNodes())
    Logger.log(
        LogLevel.INFO,
        "app",
        f"INFO REPLICATION responses from all nodes are=\n{info_repl_resps!r}",
    )


async def exec_app_logic():
    """
    Executes the application logic with exception handling.
    """
    while True:
        try:
            client = await create_client()
            return await app_logic(client)
        except asyncio.CancelledError:
            raise
        except ClosingError as e:
            # If the error message contains "NOAUTH", raise the exception
            # because it indicates a critical authentication issue.
            if "NOAUTH" in str(e):
                Logger.log(
                    LogLevel.ERROR,
                    "glide",
                    f"Authentication error encountered: {e}",
                )
            else:
                Logger.log(
                    LogLevel.WARN,
                    "glide",
                    f"Client has closed and needs to be re-created: {e}",
                )
            raise e
        except TimeoutError as e:
            # A request timed out. You may choose to retry the execution based on your application's logic
            Logger.log(LogLevel.ERROR, "glide", f"TimeoutError encountered: {e}")
            raise e
        except ConnectionError as e:
            # The client wasn't able to reestablish the connection within the given retries
            Logger.log(LogLevel.ERROR, "glide", f"ConnectionError encountered: {e}")
            raise e
        except RequestError as e:
            # Other error reported during a request, such as a server response error
            Logger.log(LogLevel.ERROR, "glide", f"RequestError encountered: {e}")
            raise e
        except Exception as e:
            Logger.log(LogLevel.ERROR, "glide", f"Unexpected error: {e}")
            raise e
        finally:
            try:
                await client.close()
            except Exception as e:
                Logger.log(
                    LogLevel.WARN,
                    "glide",
                    f"Error encountered while closing the client: {e}",
                )


def main():
    # In this example, we will utilize the client's logger for all log messages
    Logger.set_logger_config(LogLevel.INFO)
    # Optional - set the logger to write to a file
    # Logger.set_logger_config(LogLevel.INFO, file)
    asyncio.run(exec_app_logic())


if __name__ == "__main__":
    main()
Python

3.3 Python 实现 Pub / Sub 功能

Python
import os
import asyncio
import logging
import os
import zlib
from concurrent.futures import TimeoutError
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any
from typing import List, Tuple, Optional
from glide import GlideClusterClient, GlideClusterClientConfiguration, CoreCommands

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Get endpoint from environment variable
REDIS_ENDPOINT = os.getenv('REDIS_ENDPOINT', 'valkey-demo.titbpn.clustercfg.use1.cache.amazonaws.com')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))

# Define channels and patterns as constants
EXACT_CHANNELS = {"ch1", "ch2"}
PATTERN_CHANNELS = {"chat*"}


@dataclass
class NodeAddress:
    host: str = "valkey-demo.titbpn.clustercfg.use1.cache.amazonaws.com"
    port: int = 6379


class GlideClientManager:
    _instance = None

    def __init__(self):
        self.client = None

    @classmethod
    async def get_instance(cls, callback, context=None):
        if cls._instance is None:
            cls._instance = cls()
            cls._instance.client = await initialize_clients(callback, context)
        return cls._instance.client


# Create a single node address configuration to be reused
node_config = [NodeAddress(REDIS_ENDPOINT, REDIS_PORT)]


def callback(msg: CoreCommands.PubSubMsg, context: Any):
    logger.info(f"Received {msg}, context {context}")


async def initialize_clients(callback, context: Optional[object] = None):
    # Combined configuration for both listening and publishing
    client_config = GlideClusterClientConfiguration(
        node_config,
        pubsub_subscriptions=GlideClusterClientConfiguration.PubSubSubscriptions(
            channels_and_patterns={
                GlideClusterClientConfiguration.PubSubChannelModes.Exact: EXACT_CHANNELS,
                GlideClusterClientConfiguration.PubSubChannelModes.Pattern: PATTERN_CHANNELS
            },
            callback=callback,
            context=context,
        )
    )

    try:
        # Create a single client for both listening and publishing
        client = await GlideClusterClient.create(client_config)
        return client
    except Exception as e:
        print(f"Failed to initialize client: {e}")
        raise


@asynccontextmanager
async def get_client(callback, context=None):
    client = None
    try:
        client = await initialize_clients(callback, context)
        yield client
    finally:
        if client:
            await client.close()


async def publish_message(client, message: str, channel: str):
    try:
        await client.publish(message, channel)
    except Exception as e:
        print(f"Failed to publish message: {e}")
        raise


async def publish_compressed_message(client, message: str, channel: str):
    compressed_message = zlib.compress(message.encode())
    await client.publish(compressed_message, channel)


@dataclass
class PublishResult:
    success: bool
    message: str
    channel: str
    error: Optional[Exception] = None


async def publish_messages_batch(
        client: GlideClusterClient,
        messages: List[Tuple[str, str]],
        chunk_size: int = 100,
        timeout: Optional[float] = 30.0
) -> List[PublishResult]:
    """
    Publish multiple messages in batches with improved error handling and performance

    Args:
        client: GlideClusterClient instance
        messages: List of (message, channel) tuples
        chunk_size: Number of messages to process in each batch
        timeout: Maximum time in seconds to wait for batch completion

    Returns:
        List of PublishResult objects containing publish status and details
    """
    results: List[PublishResult] = []

    async def publish_single(message: str, channel: str) -> PublishResult:
        try:
            await client.publish(message, channel)
            return PublishResult(success=True, message=message, channel=channel)
        except Exception as e:
            logger.error(f"Failed to publish message to channel {channel}: {str(e)}")
            return PublishResult(success=False, message=message, channel=channel, error=e)

    # Process messages in chunks
    for i in range(0, len(messages), chunk_size):
        chunk = messages[i:i + chunk_size]
        try:
            chunk_results = await asyncio.wait_for(
                asyncio.gather(*[
                    publish_single(message, channel)
                    for message, channel in chunk
                ]),
                timeout=timeout
            )
            results.extend(chunk_results)

        except TimeoutError as e:
            results.extend([
                PublishResult(False, message, channel, e)
                for message, channel in chunk
            ])

    return results


async def main():
    async with get_client(callback) as client:
        try:
            # Example of batch publishing
            messages = [
                ("Test message 1", "ch1"),
                ("Test message 2", "ch2")
            ]
            await publish_messages_batch(client, messages)

            # Your work/wait logic here
            await asyncio.sleep(1)  # Example delay

        except Exception as e:
            logger.error(f"Error in main: {e}")


if __name__ == "__main__":
    asyncio.run(main())
Python

4. 生产环境其他注意事项

4.1 配置安全认证

根据业务的需求开启传输中加密,配置访问控制,以加强安全管控。

4.2 实现连接超时重试

根据业务延迟要求配置 request_timeoutreconnect_strategy

4.3 优雅的错误处理

虽然样例代码 exec_app_logic 函数中已经处理了错误信息,但实际这些错误发生的时候,可能需要直接通知方式,建议可以结合事件驱动的方式,来触发一些缓解或者重试策略,以减少人工介入的情况。

4.4 性能监控与指标收集

在 Valkey 中已经支持 slot 级的监控,生产环境中应当配置对应的 slot 级监控。实时查看 slot 中 key 的趋势,以获得更好的可观测性,作为运维工程师 resharding 或者 cluster resize 的参考。

5. 结语

通过本文的探讨,我们可以看到 Valkey 8.0 在性能和功能上都有显著提升。特别值得关注的是,AWS 贡献的开源客户端 Valkey-glide 为开发者提供了更强大和灵活的选择。它不仅支持 Python、Java 和 Node.js 等多种编程语言,还提供了统一的接口能力,使得故障转移和连接池管理更加可靠。结合 Valkey 8.0 提供的高性能(1.2M RPS)和更优的存储效率(节省 20% 空间),以及完善的监控指标,使其成为构建高可用、大规模分布式缓存系统的理想选择。无论是从性能提升、成本节约还是开发便利性来看,Valkey 和 Glide 的结合都为用户提供了一个强大而完整的解决方案。

6. 扩展阅读

本篇作者

林业

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的云计算方案的咨询与架构设计。拥有超过 14 年研发经验,曾打造千万级用户 APP,多项 Github 开源项目贡献者。在游戏、IoT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。