/home/tnishinaga/TechMEMO

日々行ったこと、面白かったことを書き留めます。

iperf3の実装を読んでRustで簡易版を実装してみた

iperfを使うことはあっても中の通信がどうなっているのか知らなかったので、調べてみました。

自作のコードで簡単なスループット計測が行えることをゴールとします。

環境

今回の調査はm1 mac book上でiperfを実行し、その通信内容をwiresharkで見ながら行いました。

iperfのバージョンは以下になります。

~ ❯❯❯ iperf3-darwin -v
iperf 3.8.1 -  -- Apple version iperf3-107 (cJSON 1.7.13)
Darwin hostname.local 23.1.0 Darwin Kernel Version 23.1.0: Mon Oct  9 21:28:12 PDT 2023; root:xnu-10002.41.9~6/RELEASE_ARM64_T8103 arm64
Optional features available: sendfile / zerocopy

コードは2023/12/22時点のものを参照しました。

github.com

通信の概要

通信手順の概要は以下のようになっています。(図がなくてすみません)。

  1. クライアントからサーバーにテストの設定を送る
  2. サーバーからクライアントにデータ送信開始の指示を出す
  3. クライアントからサーバーに一定量データを送る
  4. 結果をお互いに交換する

もう少し細かい手順を以下に示します。 cはclient、sはserver、矢印はどちら向きの通信かを表しています。 iperf3のテストではコマンド送受信用(cmd)とデータ通信用(data)の2つのコネクションを使用します。 そのため、どちらのコネクションを利用するかを通信方向の後に記します。

  1. c->s, cmd: Cookie(セッション識別用の文字列)を送る
  2. s->c, cmd: Param exchange(9)を送る
  3. c->s, cmd: 設定の書かれたJSONファイルサイズ(32bit, big endian)を送る
  4. c->s, cmd: JSONファイルの中身を送る
  5. s->c, cmd: CREATE_STREAMS(10)を送る
  6. c->s, data: データ通信用のコネクションを作り、そこから手順1と同一のCookieを送る
  7. s->c, cmd: TEST_START(1)を送る
  8. s->c, cmd: TEST_RUNNING(2)を送る
  9. c->s, data: データを送る
  10. c->s, cmd: TEST_END(4)を送る
  11. c->s, cmd: EXCHANGE_RESULTS(13)を送る
  12. c->s, cmd: JSONのファイルサイズ(32bit, big endian)をおくる
  13. c->s, cmd: 計測結果のJSONファイルを送る
  14. s->c, cmd: JSONのファイルサイズ(32bit, big endian)をおくる
  15. s->c, cmd: 計測結果のJSONファイルを送る
  16. c->s, cmd: IPERF_DONE(16)をおくる

以降、更に詳細について見ていきます。

コマンド

iperf3は1byteのコマンド(正確にはiperfのstate)を送って制御を行っています。

ここで使うコマンドは iperf_api.h の107行目付近に定義されています。

github.com

iperf3ではテストの識別のため、cookieと呼ばれるランダムな文字列を使います。 このCookieabcdefghijklmnopqrstuvwxyz234567 の中からランダムに選ばれた36文字と空文字の合計37byteで構成されています(参考元: iperf/src/iperf_util.c at ec06f7b43854153044c0a5e9ea2845e07262dcf8 · esnet/iperf · GitHub)。

文字テーブルから 01 が外されている理由は不明です。 理由をご存じの方がいたら教えてください。

設定のJSON

テストの序盤ではクライアントからサーバーに設定をJSONで送ります。 実際にiperf3-darwin -c localhost コマンドを実行すると以下のようなJSONが送られます。

{
    "tcp": true,
    "omit": 0,
    "time": 10,
    "parallel": 1,
    "len": 131072,
    "pacing_timer": 1000,
    "client_version": "3.8.1"
}

各設定の種類や詳細については send_parameters関数 から読んでいくと良さそうです。

最低限動けば良いレベルであれば、テスト時間を示すtimeとTCPのブロックサイズを決めるlenだけ注意していれば良かったです。 それ以外のomitやpacing_timerについては、どの部分に影響のある設定か把握しきれていません。

結果のJSON

計測が終わった後、サーバーとクライアントの両方で計測した結果を交換します。

実際にやり取りされているJSONはこんな感じになっています。

{
    "cpu_util_total": 85.569230769230771,
    "cpu_util_user": 10.461538461538462,
    "cpu_util_system": 75.1076923076923,
    "sender_has_retransmits": 1,
    "streams": [
        {
            "id": 1,
            "bytes": 506824,
            "retransmits": 0,
            "jitter": 0,
            "errors": 0,
            "packets": 0,
            "start_time": 0,
            "end_time": 0.000238
        }
    ]
}

streamsの中身は配列になっており、idは1番開始になっています。 オリジナルの実装では1秒毎の計測結果をstreamsに入れて送っているようですが、n秒分まとめて送っても受け付けてもらえました。

iperf/src/iperf_api.c at ec06f7b43854153044c0a5e9ea2845e07262dcf8 · esnet/iperf · GitHub

サンプルコードの実装と動作確認

以上でなんとなく仕組みがわかったので、簡単なiperf3クライアントをRustで実装してみました。 とりあえず動くこと優先で作ったので、コードが汚いのはご了承ください。

// main.rs

use heapless;
use log::debug;
use num_enum::{FromPrimitive, IntoPrimitive};
use rand::{Rng, RngCore, SeedableRng};
use rand_xorshift;
use serde::{Deserialize, Serialize};

const DEFAULT_TCP_BLOCKSIZE: usize = 128 * 1024;

#[derive(IntoPrimitive, FromPrimitive, Debug)]
#[repr(u8)]
enum IperfApi {
    TestStart = 1,
    TestRunning = 2,
    ResultRequest = 3,
    TestEnd = 4,
    StreamBegin = 5,
    StreamRunning = 6,
    StreamEnd = 7,
    AllStreamsEnd = 8,
    ParamExchange = 9,
    CreateStreams = 10,
    ServerTerminate = 11,
    ClientTerminate = 12,
    ExchangeResults = 13,
    DisplayResults = 14,
    IperfStart = 15,
    IperfDone = 16,
    #[num_enum(default)]
    Unknown,
}

#[derive(Serialize, Deserialize)]
struct IperfConfig {
    tcp: bool,
    omit: i32,
    time: i32,
    parallel: isize,
    /// block size
    len: usize,
    /// pacing timer in microseconds
    pacing_timer: i32,
    client_version: &'static str,
}

impl Default for IperfConfig {
    fn default() -> Self {
        Self {
            tcp: true,
            omit: 0,
            time: 10,
            parallel: 1,
            /// DEFAULT_TCP_BLOCKSIZE
            len: DEFAULT_TCP_BLOCKSIZE,
            /// DEFAULT_PACING_TIMER
            pacing_timer: 1000,
            client_version: "3.8.1",
        }
    }
}

#[derive(Serialize, Deserialize, Default, Debug)]
struct IperfResult<const N: usize> {
    cpu_util_total: f64,
    cpu_util_user: f64,
    cpu_util_system: f64,
    sender_has_retransmits: i32,
    streams: heapless::Vec<IperfStreamResult, N>,
}

#[derive(Serialize, Deserialize, Default, Debug)]
struct IperfStreamResult {
    id: i32,
    bytes: u64,
    retransmits: i32,
    jitter: i32,
    errors: i32,
    packets: i32,
    start_time: f64,
    end_time: f64,
}

use rand::distributions::Alphanumeric;
use std::error::Error;
use std::time::{Duration, SystemTime};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpSocket, TcpStream};

fn make_cookie(cookie: &mut [u8; 37], seed: u64) {
    let mut rng = rand_xorshift::XorShiftRng::seed_from_u64(seed);
    // do not forget '\0' termination
    // TODO: choose from "abcdefghijklmnopqrstuvwxyz234567"
    // ref: make_cookie function
    let (last, c) = cookie.split_last_mut().unwrap();
    c.iter_mut().for_each(|x| *x = rng.sample(Alphanumeric));
    *last = b'\0';
    debug!("cookie: {:?}", String::from_utf8(cookie.to_vec()).unwrap());
}

async fn iperf3_client() -> Result<(), Box<dyn Error>> {
    // Overview
    //  1. c->s Cookie(セッション識別用の文字列)を送る
    //  2. s->c Param exchange(9)を送る
    //  3. c->s 設定の書かれたJSONファイルのサイズを4byteで送る
    //  4. c->s JSONファイルの中身を送る
    //  5. s->c CREATE_STREAMS(10)を送る
    //  6. c->s Cookieを送る
    //  7. s->c TEST_START(1)を送る
    //  8. s->c TEST_RUNNING(2)を送る
    //  9. c->s データを送る
    //  10. c->s TEST_END(4)を送る
    //  11. s->c EXCHANGE_RESULTS(13)を送る
    //  12. c->s JSONのファイルサイズを4byteでおくる
    //  13. c->s 計測結果のJSONファイルを送る
    //  14. s->c JSONのファイルサイズを4byteでおくる
    //  15. s->c 計測結果のJSONファイルを送る
    //  16. c->s IPERF_DONE(16)をおくる

    // Connect to a peer
    let mut command_stream = TcpStream::connect("127.0.0.1:5201").await?;
    let addr = command_stream.local_addr().unwrap();
    println!("command_stream addr = {:?}", addr);

    let seed = SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .unwrap()
        .as_secs();
    let mut rng = rand_xorshift::XorShiftRng::seed_from_u64(seed);

    // create cookie
    let mut cookie = [0u8; 37];
    make_cookie(&mut cookie, seed);

    command_stream.write(&cookie).await?;

    let mut ack = [0u8; 1];
    command_stream.read(&mut ack).await?;

    match IperfApi::try_from(ack[0]).unwrap() {
        IperfApi::ParamExchange => (),
        _ => unimplemented!(),
    }

    // crate json
    let config = IperfConfig::default();
    let config_str = serde_json_core::to_string::<IperfConfig, 512>(&config)
        .unwrap()
        .to_string();
    let config_str_size = u32::try_from(config_str.len()).unwrap();
    println!("config_str: {:?}", config_str);

    // send json size
    command_stream.write(&config_str_size.to_be_bytes()).await?;
    // send json
    command_stream.write(config_str.as_bytes()).await?;

    // read command
    let mut ack = [0u8; 1];
    command_stream.read(&mut ack).await?;
    match IperfApi::try_from(ack[0]).unwrap() {
        IperfApi::CreateStreams => (),
        _ => unimplemented!(),
    }
    println!("got CreateStreams");

    let mut data_stream = TcpStream::connect("127.0.0.1:5201").await?;

    // resend cookie
    println!("send cookie to data stream");
    data_stream.write(&cookie).await?;

    let addr = data_stream.local_addr().unwrap();
    println!("data stream addr = {:?}", addr);

    // get test start
    println!("waiting to receive TestStart");
    let mut ack = [0u8; 1];
    command_stream.read(&mut ack).await?;
    match IperfApi::try_from(ack[0]).unwrap() {
        IperfApi::TestStart => (),
        _ => unimplemented!(),
    }

    println!("waiting to receive TestRunning");
    let mut ack = [0u8; 1];
    command_stream.read(&mut ack).await?;
    match IperfApi::try_from(ack[0]).unwrap() {
        IperfApi::TestRunning => (),
        _ => unimplemented!(),
    }

    let mut stream_result = IperfStreamResult::default();
    stream_result.id = 1;

    let start_time = SystemTime::now();
    while start_time.elapsed()?.as_secs() < config.time.try_into().unwrap() {
        // send data
        // ref: https://hanya-orz.hatenablog.com/entry/2020/08/05/131158
        let mut data = [0; DEFAULT_TCP_BLOCKSIZE];
        rng.fill_bytes(&mut data);
        let _n = data_stream.write_all(&data).await?;
        // stream_result.bytes += u64::try_from(n).unwrap();
        stream_result.bytes += u64::try_from(DEFAULT_TCP_BLOCKSIZE).unwrap();
    }
    let end_time = SystemTime::now();

    stream_result.end_time = end_time.duration_since(start_time).unwrap().as_secs_f64();

    command_stream.write(&[IperfApi::TestEnd.into()]).await?;

    let mut ack = [0u8; 1];
    command_stream.read(&mut ack).await?;
    match IperfApi::try_from(ack[0]).unwrap() {
        IperfApi::ExchangeResults => (),
        _ => unimplemented!(),
    }
    println!("got ExchangeResults");

    let mut result: IperfResult<1> = IperfResult::default();
    result.streams.push(stream_result).unwrap();
    let result_str = serde_json_core::to_string::<IperfResult<1>, 512>(&result).unwrap();
    let result_str_size = u32::try_from(result_str.len()).unwrap();
    println!("result_str: {:?}", result_str);

    // send json size
    command_stream.write(&result_str_size.to_be_bytes()).await?;
    command_stream.flush().await?;
    // send json
    command_stream.write(result_str.as_bytes()).await?;

    // read result length
    let mut server_result_size_buf = [0u8; 4];
    command_stream.read(&mut server_result_size_buf).await?;
    let mut buf = [0; 512];
    command_stream.read(&mut buf).await?;

    command_stream.write(&[IperfApi::IperfDone.into()]).await?;

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    iperf3_client().await.unwrap();

    Ok(())
}
# Cargo.toml

[package]
name = "iperf3-rs"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
heapless = { version = "0.8.0", features = ["serde"] }
log = { version = "0.4.20", default-features = false }
num_enum = { version = "0.7.1", default-features = false }
rand = { version = "0.8.5", default-features = false }
rand_xorshift = "0.3.0"
# rust-fsm = { version = "0.6.1", default-features = false, features = ["dsl"] }
serde = { version = "1.0.193", default-features = false, features = [
    "derive",
    "serde_derive",
] }
serde-json-core = "0.5.1"
tokio = { version = "1", features = ["full"] }

実行結果

上記コードを実行したところ、とりあえずエラーを出さずテストを完了できました。 結果は大体20Gbps程度でした。

-----------------------------------------------------------
Server listening on 5201
-----------------------------------------------------------
Accepted connection from 127.0.0.1, port 54195
[  5] local 127.0.0.1 port 5201 connected to 127.0.0.1 port 54196
[ ID] Interval           Transfer     Bitrate         Rwnd
[  5]   0.00-1.00   sec  2.27 GBytes  19.5 Gbits/sec  1.22 MBytes
[  5]   1.00-2.00   sec  2.33 GBytes  20.0 Gbits/sec  1.22 MBytes
[  5]   2.00-3.00   sec  2.27 GBytes  19.5 Gbits/sec  2.24 MBytes
[  5]   3.00-4.00   sec  2.28 GBytes  19.6 Gbits/sec  2.24 MBytes
[  5]   4.00-5.00   sec  2.27 GBytes  19.5 Gbits/sec  2.24 MBytes
[  5]   5.00-6.00   sec  2.32 GBytes  20.0 Gbits/sec  2.24 MBytes
[  5]   6.00-7.00   sec  2.30 GBytes  19.8 Gbits/sec  2.24 MBytes
[  5]   7.00-8.00   sec  2.33 GBytes  20.0 Gbits/sec  2.24 MBytes
[  5]   8.00-9.00   sec  2.30 GBytes  19.7 Gbits/sec  2.24 MBytes
[  5]   9.00-10.00  sec  2.33 GBytes  20.0 Gbits/sec  2.24 MBytes
[  5]  10.00-10.00  sec   128 KBytes  9.36 Gbits/sec  2.12 MBytes
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate
[  5]   0.00-10.00  sec  23.0 GBytes  19.8 Gbits/sec                  receiver

ちなみに本家iperf3を使うと117Gbps程度出たので、約6倍程度の性能差があります。

~ ❯❯❯ iperf3-darwin -c 127.0.0.1
Connecting to host 127.0.0.1, port 5201
[  5] local 127.0.0.1 port 54220 connected to 127.0.0.1 port 5201
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd          RTT
[  5]   0.00-1.00   sec  13.3 GBytes   114 Gbits/sec    0   4.00 MBytes   1ms
[  5]   1.00-2.00   sec  13.7 GBytes   117 Gbits/sec    0   4.00 MBytes   1ms
[  5]   2.00-3.00   sec  13.7 GBytes   118 Gbits/sec    0   4.00 MBytes   1ms
[  5]   3.00-4.00   sec  13.6 GBytes   117 Gbits/sec    0   4.00 MBytes   1ms
[  5]   4.00-5.00   sec  13.5 GBytes   116 Gbits/sec    0   4.00 MBytes   1ms
[  5]   5.00-6.00   sec  13.6 GBytes   117 Gbits/sec    0   4.00 MBytes   1ms
[  5]   6.00-7.00   sec  13.7 GBytes   117 Gbits/sec    0   4.00 MBytes   1ms
[  5]   7.00-8.00   sec  13.7 GBytes   117 Gbits/sec    0   4.00 MBytes   1ms
[  5]   8.00-9.00   sec  13.5 GBytes   116 Gbits/sec    0   4.00 MBytes   1ms
[  5]   9.00-10.00  sec  13.7 GBytes   117 Gbits/sec    0   4.00 MBytes   1ms
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec   136 GBytes   117 Gbits/sec    0             sender
[  5]   0.00-10.00  sec   136 GBytes   117 Gbits/sec                  receiver

iperf Done.

もうちょっとiperfの設計や実装を理解して真似すると速くなるのかもしれません。

以上です。