用 Rust 來做簡易 Ngrok 服務(上)

#rust #ngrok
用 Rust 來做簡易 Ngrok 服務(上)
五倍技術部
技術文章
用 Rust 來做簡易 Ngrok 服務(上)

不知道大家在開發網站是不是有遇過需要與其他人共享或進行測試的情境,雖然可以通過部署後給對方網址,但這樣其實蠻花時間,而且通常這個過程會一直反覆的做,這時候可以使用 Ngrok,來讓我們的本機 Server 可以更方便的讓對方做測試。

什麼是 Ngrok?

Ngrok 是一個反向代理工具,可以將本地網路上的服務到公共網路上也能使用。簡單來說,Ngrok 提供了一個安全的通道,讓外部網路可以訪問在你本地機器上運行的應用。這是通過將公共網路上的一個端點(通常是一個 URL 或者一個固定的 IP 地址和端口)映射到你本地機器的一個特定端口來實現的。

Ngrok 工具

雖然 Ngrok 是一個很常見的服務,不過這次想要自己用 Rust 來打造一個簡易的 Ngrok 的服務,說不定之後就可以自己用自己打造的軟體來做這些事,這就是軟體工程師的浪漫(?

在使用 Rust 來實作一個簡易 Ngrok 的服務之前,有一些前置作業和準備需要完成:

環境準備

  • 確保電腦已安裝 Rust
  • 有一個 Domain Name(不是必須,但如果最後要在別台電腦測試的話就需要)
  • 任一雲端主機服務(會用 DigitalOcean 示範)

準備好這些之後,接下來介紹一下在這個專案中,有兩個主要的部分:

  • Ngrok Server:這是核心的 Ngrok 服務,負責建立與客戶端 WebSocket 連線,並將 HTTP 請求轉發到相應的本地後端服務。
  • Ngrok Client:Ngrok Client 與 Ngrok Server 建立 Websocket 連線,並負責接收從 Ngrok Server 來的 HTTP 請求,然後轉發到本地後端服務。

由於篇幅問題,所以本篇文章會先介紹 Ngrok Server 的部分。

建立新專案 Ngrok server

安裝相關套件:

[dependencies]
actix = "0.13.1"
actix-rt = "2.9.0"
actix-web = "4.4.0"
actix-web-actors = "4.2.0"
diesel = { version = "2.1.1", features = ["postgres"] }
lazy_static = "1.4.0"
serde = { version = "1.0.188", features = ["derive"] }
serde_derive = "1.0.188"
serde_json = "1.0.107"
tokio = { version = "1.32.0", features = ["full"] }
  • actix: 提供 actor 模型的框架。
  • actix-web: 用於建立 Web 服務。
  • actix-web-actors: 提供 WebSocket 的 handler。
  • serde: 用於轉換 JSON 格式。

建立一個 Server

首先會建立一個基本的 HTTP 伺服器,還有一個 WebSocket 路由,會使用 actix-web 的 WebSocket API 來實現。

use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Result};
use actix_web::web::Payload;
use actix_web_actors::ws;

async fn ws_index(req: HttpRequest, stream: Payload) -> Result<HttpResponse> {
    Ok(HttpResponse::Ok().finish())
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .route("/ws/", web::get().to(ws_index))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

WebSocket 伺服器邏輯

接下來,需要建立一個 WebSocket Actor,這個 Actor 將處理 WebSocket 連接。

use actix::{Actor, StreamHandler};
use actix_web_actors::ws

struct MyWs;

impl Actor for MyWs {
    type Context = ws::WebsocketContext<Self>;
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Text(text)) => ctx.text(text),
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            _ => (),
        }
    }
}

async fn ws_index(req: HttpRequest, stream: Payload) -> Result<HttpResponse> {
    let res = ws::start(MyWs {}, &req, stream);
    res
}

上面的 MyWs Actor 會回應所有接收到的 WebSocket 資料。

測試 WebSocket 是否連接

剛剛已經建立了簡單的 WebSocket,接下來要測試有沒有建立成功。

我們可以打開瀏覽器,在畫面上按右鍵點擊檢查,接著點擊 Console 分頁,貼上以下的程式碼:

// 建立新的 WebSocket 連接
const ws = new WebSocket('ws://127.0.0.1:8080/ws/');

// 當連接打開時
ws.addEventListener('open', function(event) {
    console.log('WebSocket is open now.');
    ws.send('Hello Server!');
});

// 當收到消息時
ws.addEventListener('message', function(event) {
    console.log('WebSocket message received:', event.data);
});

這個測試會建立一個新的 WebSocket 連接,並在連接成功建立後發送一個 Hello Server! 消息。同時也會監聽從服務器發送過來的任何消息。

Tunnel

在 WebSocket 中實現一個隧道(Tunnel)概念,這個隧道會將來自公網的請求轉發到用戶的本地 Server。

建立隧道

為了要建立隧道,在 MyWs 加上 ID,讓隧道之後可以去判斷並且轉發請求。

struct MyWs {
    // 假設每個隧道有一個唯一的 ID
    tunnel_id: String,
}

增加新的路由和函式

async fn public_endpoint(req: HttpRequest) -> Result<HttpResponse> {
    Ok(HttpResponse::Ok().body("This is a public endpoint"))
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .route("/ws/", web::get().to(ws_index))
            .route("/public/", web::get().to(public_endpoint))  // <- 新增的路由
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

更新 StreamHandler

StreamHandler,我們需要加入請求轉發和其他自定義消息的處理邏輯。

可能會有多種不同類型的自定義消息,例如建立隧道、關閉隧道、轉發 HTTP 請求等。一種常見的做法是使用 JSON 來處理這些消息。

例如,建立一個隧道的消息可能看起來像這樣:

{
  "type": "create_tunnel",
  "data": {
    "tunnel_id": "some_unique_id"
  }
}

StreamHandler 中,可以這樣處理:

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Text(text)) => {
                let message: serde_json::Value = serde_json::from_str(&text).unwrap();
    if let Some(msg_type) = message.get("type").and_then(|v| v.as_str()) {
        match msg_type {
            "create_tunnel" => {
                // 處理建立隧道的邏輯
                let tunnel_id = message["data"]["tunnel_id"].as_str().unwrap();
                self.tunnel_id = tunnel_id.to_string();
            },
            // 其他自定義消息類型
            _ => (),
        }
    }
            },
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            _ => (),
        }
    }
}

處理請求轉發

請求轉發涉及到兩個主要部分:

  • 從公網接收 HTTP 請求:這部分會在你的 public_endpoint 中處理。
  • 將請求轉發到正確的 WebSocket 連接:這部分會在你的 StreamHandler 實現中處理。
#[macro_use]
extern crate lazy_static;

use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::MutexGuard;

lazy_static! {
    static ref GLOBAL_MAP: Mutex<HashMap<String, actix::Addr<MyWs>>> = Mutex::new(HashMap::new());
}

async fn ws_index(req: HttpRequest, stream: Payload) -> Result<HttpResponse> {
    let tunnel_id = "test_tunnel".to_string();
    let my_ws = MyWs { tunnel_id };
    let res = ws::start(my_ws, &req, stream);
    res
}

async fn public_endpoint(req: HttpRequest) -> Result<HttpResponse> {
    let tunnel_id = req
        .match_info()
        .get("tunnel_id")
        .unwrap_or("default_tunnel_id")
        .to_string();

    // 獲取 Mutex 的鎖
    let lock: MutexGuard<HashMap<String, actix::Addr<MyWs>>> = GLOBAL_MAP.lock().unwrap();

    // 使用鎖來訪問 HashMap
    if let Some(ctx) = lock.get(&tunnel_id) {
        // 將請求轉發到這個 WebSocket 連接
    }

    Ok(HttpResponse::Ok().body("This is a public endpoint"))
}

加入新的套件模組

新增以下:

use actix::AsyncContext;
use serde::{Deserialize, Serialize};
use std::sync::MutexGuard;

新增這些的原因是因為我們需要在 StreamHandler 中處理自定義消息,並且需要在 public_endpoint 中解析 HTTP 請求。

新增全域變數

新增一個 lazy_static! 區塊:

lazy_static! {
    static ref SUBDOMAIN_MAP: HashMap<String, String> = {
        let mut map = HashMap::new();
        map.insert("tunnel1.ngrok.me".to_string(), "tunnel1".to_string());
        map
    };
}

這個全域變數會用來將子域名映射到隧道 ID。

新增新的 struct

#[derive(Serialize, Deserialize)]
struct HttpRequestStruct {
    method: String,
    path: String,
    headers: HashMap<String, String>,
    body: Option<String>,
}

struct MyCustomMessage {
    serialized_request: String,
}

impl actix::Message for MyCustomMessage {
    type Result = ();
}

HttpRequestStruct 會用來解析 HTTP 請求,MyCustomMessage 會用來將請求轉發到 WebSocket 連接。

更新 Actor 實現

  • 找到 MyWs 的 Actor 實現區塊。
  • 在該區塊中新增 started 函式:
impl Actor for MyWs {
    type Context = ws::WebsocketContext<Self>;
    // 啟動時會處理 <--
    fn started(&mut self, ctx: &mut Self::Context) {
        let mut global_map = GLOBAL_MAP.lock().unwrap();
        println!("WebSocket started for tunnel_id: {}", self.tunnel_id);
        global_map.insert(self.tunnel_id.clone(), ctx.address().clone());
    }
}
  • 新增 actix::Handler的實現:
impl actix::Handler<MyCustomMessage> for MyWs {
    type Result = ();

    fn handle(&mut self, msg: MyCustomMessage, ctx: &mut Self::Context) {
        // 將 `msg.serialized_request` 發送到 WebSocket
        ctx.text(msg.serialized_request);
    }
}

更新 WebSocket 消息處理

  • 找到 StreamHandler 的實現區塊。
  • 在 handle 方法中新增新的消息處理邏輯:
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Text(text)) => {
                let mut message: Option<serde_json::Value> = None;

                if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
                    println!("Received message: {:?}", parsed);
                    message = Some(parsed);
                } else {
                    eprintln!("Failed to parse the message: {}", text);
                }

                ctx.text(r#"{"type":"create_tunnel","data":{"tunnel_id":"tunnel_id_1234567890"}}"#);
                if let Some(ref message) = message {
                    if let Some(msg_type) = message.get("type").and_then(|v| v.as_str()) {
                        match msg_type {
                            "create_tunnel" => {
                                let tunnel_id = message["data"]["tunnel_id"].as_str().unwrap();
                                println!("Creating tunnel with id: {}", tunnel_id);
                                self.tunnel_id = tunnel_id.to_string();
                            }
                            "response_from_service" => {
                                println!("Received a 'response_from_service' message.");
                                let response_data = message["data"].clone();
                                handle_service_response(response_data, ctx);
                            }
                            _ => (),
                        }
                    }
                }
            }
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            _ => (),
        }
    }
}

新增請求轉發邏輯

  • 新增 send_http_responsehandle_service_response 函式:
fn send_http_response(
    status_code: u64,
    headers: &serde_json::Map<String, serde_json::Value>,
    _body: &str,
    ctx: &mut ws::WebsocketContext<MyWs>,
) {
    let response = format!(
        "HTTP/1.1 {} OK\r\n{}",
        status_code,
        headers
            .iter()
            .map(|(k, v)| format!("{}: {}", k, v))
            .collect::<Vec<String>>()
            .join("\r\n")
    );

    ctx.text(response);
}

fn handle_service_response(response_data: serde_json::Value, ctx: &mut ws::WebsocketContext<MyWs>) {
    let status_code = response_data["status_code"].as_u64().unwrap_or_else(|| {
        eprintln!("Status code not found");
        200
    });
    let empty_map = serde_json::Map::new();
    let headers = response_data["headers"].as_object().unwrap_or(&empty_map);
    let body = response_data["body"].as_str().unwrap_or("");

    send_http_response(status_code, headers, body, ctx);
}

send_http_response 會將 HTTP response 發送到 WebSocket 連接,handle_service_response 會處理來自後端服務的響應。

  • 更新 public_endpoint 函式:
async fn public_endpoint(req: HttpRequest) -> Result<HttpResponse> {
    let tunnel_id = req
        .match_info()
        .get("tunnel_id")
        .unwrap_or("default_tunnel_id")
        .to_string();

    let lock: MutexGuard<HashMap<String, actix::Addr<MyWs>>> = GLOBAL_MAP.lock().unwrap();

    if let Some(ctx) = lock.get(&tunnel_id) {
        let http_request = HttpRequestStruct {
            method: req.method().to_string(),
            path: req.path().to_string(),
            headers: req
                .headers()
                .iter()
                .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
                .collect(),
            body: None,
        };

        let serialized_request = serde_json::to_string(&http_request).unwrap();
        ctx.do_send(MyCustomMessage { serialized_request })
    } else {
        eprintln!("No WebSocket found for tunnel_id: {}", tunnel_id)
    }
    Ok(HttpResponse::Ok().body("This is a public endpoint"))
}

public_endpoint 會將 HTTP 請求轉發到 WebSocket 連接。

增加動態路由

async fn dynamic_routing(req: HttpRequest) -> actix_web::Result<HttpResponse> {
    let tunnel_id = "test_tunnel".to_string();

    let lock = GLOBAL_MAP.lock().unwrap_or_else(|err| {
        eprintln!("Failed to acquire lock: {}", err);
        panic!("Cannot acquire lock");
    });

    if let Some(ctx) = lock.get(&tunnel_id) {
        let http_request = HttpRequestStruct {
            method: req.method().to_string(),
            path: req.path().to_string(),
            headers: req
                .headers()
                .iter()
                .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
                .collect(),
            body: None,
        };

        let serialized_request = serde_json::to_string(&http_request).unwrap();
        ctx.do_send(MyCustomMessage { serialized_request });
    } else {
        eprintln!("No WebSocket found for tunnel_id: {}", tunnel_id);
    }

    Ok(HttpResponse::Ok().finish())
}

動態路由的目的是為了讓我們可以使用子域名來訪問隧道。

更新主函式

  • 新路由和其他設置以反映新的變化。
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .route("/ws/", web::get().to(ws_index))
            .service(web::resource("/public/").route(web::get().to(public_endpoint)))
            .default_service(web::route().to(dynamic_routing))
    })
    .bind(("127.0.0.1", 8081))?
    .run()
    .await
}

主要功能:

最後,我們來看一下這個 Ngrok Server 的主要功能:

  • WebSocket 連接建立(MyWs): 當 WebSocket 連線建立後,會將連線存入 GLOBAL_MAP。
  • HTTP 請求轉發(publicendpoint、dynamicrouting): 當收到 HTTP 請求後,會查找相應的 WebSocket 連線並轉發請求。
  • 消息處理(handleserviceresponse): 負責處理來自客戶端的 HTTP 響應。