目录

Rust 微服务框架设计与实践

一、项目背景

工作中接触了微服务架构,大多基于 Java 或 Go。作为 Rust 爱好者,想尝试用 Rust 写一套适合实际业务的微服务框架。另外我自己写了一个记账App一直在运行中,自己也一直在用,于是刚好用Rust和微服务架构来重构一下。Rust上并没有已有的成熟微服务框架,于是我刚好自己去实现(或者说接入)微服务通信、上下文处理、trace等功能,来加深对微服务系统的理解。

项目地址:simplife-tech/simplife

二、项目结构

simplife
├── akasha              # 核心基础库
├── service-template    # 服务模板
├── account-service     # 账户服务
├── ledger-service      # 账本服务
├── agenda-service      # 日程服务
├── grpc-client         # gRPC 客户端封装
└── protos              # Protobuf 协议定义

三、akasha:核心基础库

akasha是整个框架的灵魂,它封装了微服务所需的所有基础设施。

架构图

flowchart TB subgraph Service[业务服务层] A[Account Service] B[Ledger Service] C[Agenda Service] end subgraph Akasha[akasha 核心库] subgraph Middleware[中间件层] M1[Trace 链路追踪] M2[Context 上下文] M3[LoginCheck 登录校验] end subgraph Core[核心组件] D[DB - MySQL 连接池] E[Redis - 缓存] F[Error - 统一错误处理] G[Log - 日志系统] H[Context - 请求上下文] end subgraph Infra[基础设施] I[MultiplexService HTTP/gRPC复用] J[OpenTelemetry 链路追踪] K[gRPC 封装] end end Service --> Middleware Middleware --> Core Core --> Infra

1. 中间件系统

akasha 提供了一套中间件机制,可以在请求处理前后做各种事情:

Trace 中间件 - 自动为每个 HTTP 请求生成 trace ID,并注入到响应头:

pub async fn trace_http<B>(
    mut req: Request<B>,
    next: Next<B>
) -> Result<impl IntoResponse, (StatusCode, String)> {
    opentelemetry::global::tracer_provider()
        .tracer("")
        .in_span(req.uri().path().to_string(), |cx| async move {
            let span = cx.span();
            span.set_attribute(Key::new("http.method").string(req.method().to_string()));
            span.set_attribute(Key::new("http.target").string(req.uri().to_string()));

            let trace_id = span.span_context().trace_id();
            let ctx = req.extensions_mut().get_mut::<Context>().unwrap();
            ctx.opentelemetry_context = cx.clone();

            let mut res = next.run(req).await;
            res.headers_mut().insert("simplife-trace-id", trace_id.to_string());
            Ok(res)
        }).await
}

Context 中间件 - 在请求扩展中注入自定义上下文,方便后续 handler 使用。

2. 统一错误处理

定义了统一的错误类型,所有错误最终都会转换成标准化的响应格式:

pub enum Error {
    ServiceError(i32, &'static str),  // 业务错误
    ServerError(String),               // 服务器错误
    NetWorkError,                      // 网络错误
    NotLogin,                          // 未登录
    NotExistUid,                       // UID 不存在
    RequestTimeout,                    // 请求超时
    BadRequest,                        // 参数错误
    UserHasNoFamily,                   // 业务特定错误
}

impl IntoResponse for Error {
    fn into_response(self) -> axum::response::Response {
        match self {
            Error::ServiceError(code, msg) => (
                StatusCode::OK,
                Json(Response::fail(code, msg))
            ).into_response(),
            Error::NotLogin => (
                StatusCode::OK,
                Json(Response::fail(-401, "未登录"))
            ).into_response(),
            // ... 其他错误
        }
    }
}

这样所有 handler 都可以统一返回 Result<T, Error>,框架会自动处理错误响应。

3. 数据库和 Redis 封装

数据库 - 基于 SQLx 的 MySQL 连接池:

#[derive(Debug, Clone)]
pub struct Db {
    pub pool: Pool<MySql>
}

impl Db {
    pub fn new(pool: Pool<MySql>) -> Db {
        Self { pool }
    }
}

pub async fn db_connect(db_url: &str, max_connections: u32) -> Result<Pool<MySql>, sqlx::Error> {
    MySqlPoolOptions::new()
        .max_connections(max_connections)
        .connect_with(options)
        .await
}

Redis - 基于 ConnectionManager 的连接复用:

#[derive(Clone)]
pub struct Redis {
    pub manager: ConnectionManager,
}

4. 链路追踪集成

akasha 集成了 OpenTelemetry 和 Jaeger,支持自动打点和传播:

// 宏定义自动打点
#[macro_export]
macro_rules! instrumented_redis_cmd {
    ($oc:expr, $conn:expr, $key:expr, $($arg:tt)*) => {{
        let tracer = akasha::opentelemetry::global::tracer_provider().tracer("");
        let mut span = tracer.start_with_context(name, &$oc);
        span.set_attribute(akasha::opentelemetry::Key::new("db.key").string($key.to_string()));
        let result = $conn.$($arg)*.await;
        span.end();
        result
    }};
}

5. 日志系统

基于 log4rs 的统一日志配置:

pub fn init_config(level: log::LevelFilter) -> log4rs::Handle {
    let stdout = ConsoleAppender::builder().build();
    let file = FileAppender::builder()
        .encoder(Box::new(PatternEncoder::new("{d} - {m}{n}")))
        .build("log/simplife.log")
        .unwrap();

    let config = Config::builder()
        .appender(Appender::builder().build("stdout", Box::new(stdout)))
        .appender(Appender::builder().build("file", Box::new(file)))
        .build(Root::builder().appenders(vec!["stdout", "file"]).build(level))
        .unwrap();

    log4rs::init_config(config).unwrap()
}

同时输出到控制台和文件,方便开发和排查问题。

四、核心设计亮点

1. HTTP/gRPC 端口复用

这是 akasha 最巧妙的设计之一。通过 MultiplexService 实现 HTTP 和 gRPC 复用同一个端口:

pub struct MultiplexService<A, B> {
    rest: A,        // REST 服务
    grpc: B,        // gRPC 服务
    rest_ready: bool,
    grpc_ready: bool,
}

impl<A, B> Service<Request<hyper::Body>> for MultiplexService<A, B> {
    fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
        if is_grpc_request(&req) {
            // 根据 Content-Type 判断是否为 gRPC 请求
            self.grpc.call(req)
        } else {
            self.rest.call(req)
        }
    }
}

判断逻辑很简单:检查 Content-Type 是否以 application/grpc 开头。这样运维只需要管理一个端口,大大简化了部署。

2. 请求上下文传递

通过 Axum 的 extensions 机制,在请求生命周期内传递上下文:

#[derive(Clone, Default)]
pub struct Context {
    pub opentelemetry_context: opentelemetry::Context,
    pub keys_i64: HashMap<&'static str, i64>
}

// 在 middleware 中注入
req.extensions_mut().insert(ctx);

// 在 handler 中取出
let ctx = req.extensions().get::<Context>().unwrap();

3. 优雅关闭

支持接收信号后优雅关闭服务:

pub async fn shutdown_signal() {
    let ctrl_c = async {
        signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }

    println!("signal received, starting graceful shutdown");
}

4. 服务模板化

service-template 是一个标准的服务模板,新建服务只需复制:

cp -r service-template new-service

然后修改 Cargo.toml 和注册自己的 gRPC 服务即可。这种"约定优于配置"的思路让新服务可以快速启动。

五、使用示例

启动服务

cargo run -- \
    --db mysql://user:pass@localhost:3306/mydb \
    --redis redis://localhost:6379 \
    --listen-ip 0.0.0.0 \
    --listen-port 27001

编写 handler

async fn create_user(
    State(state): State<AppState>,
    Json(req): Json<CreateUserRequest>,
) -> Result<Json<UserResponse>, Error> {
    let user = state.db.create_user(&req).await?;
    Ok(Json(user))
}

六、总结

simplife 是我对 Rust 微服务架构的一次完整探索。把底层功能集成独立出来形成 akasha 这个核心库,封装了:

  • 中间件系统(链路追踪、上下文、登录校验)
  • 统一错误处理
  • 数据库和 Redis 封装
  • 日志和可观测性
  • HTTP/gRPC端口复用

Rust 的类型系统和零成本抽象让写微服务变得既安全又高效。