nvmf <1> - introduction (feat. ChatGPT)
ChatGPT를 사용하면 블로그를 편하게 찍어낼 수 있다는 이야기를 들어, 아침에 일어나 “폰”으로 ChatGPT 에게 물어보았다, 아니 여쭤보았다:
how to accept a nvmeof connection with a tcp socket? can i have an example?
폰으로 입력했기 때문에 대소문자 구분이 귀찮았다. 어쨌든 그랬더니, 내가 읽는 속도보다도 빠르게 줄줄 NVMe-oF 스펙을 읊으면서 예제 코드를 주는데 결과는 기대 이상으로 좋았고 몇번의 문답으로 원하는 형태로 코드를 바꿔주는데, 코드가 길어지면 리팩토링도 하면서 코드를 보여주어서 실시간 학습이 되면서 특히 재미가 있었다! 최종 코드가 컴파일은 되지 않아 몇 줄 정도 수동으로 바꾸어 주었어야 했는데, 이건 RUST/tokio async/await 쪽 변경점과 관련된 것 처럼 보여, 예전에 학습했던 개발자라면 마찬가지로 겪는 일이었을 것 같아 그것도 대단해 보였다. 결과적으로 이 포스팅은, 처음 ChatGPT와 대화를 시작하고 나서, 1시간 내에 publish까지 하게 되는 것 같다.
아래 예제는, 일단 컴파일은 되는 하지만 기능 검증은 하지 않은 코드이다. 하지만, NVMe-oF target 개발을 염두에 두었다면, 훌륭한 시작점이 될 것임을 느낄 수 있을 것이다. 다시 언급하지만, 빌드 수정 서너줄 빼면, 그대로 copy&paste를 한 코드이다. 이 내용으로 포스팅을 몇 편 이어가 보려 한다.
Example 1
use std::collections::HashMap;
use std::convert::TryInto;
use std::error::Error;
use std::io::{Read, Write};
use tokio::net::{TcpListener, TcpStream};
// Define a struct to represent an I/O queue
struct IoQueue {
nsid: u32,
cqid: u32,
qsize: u32,
}
impl IoQueue {
fn new(nsid: u32, cqid: u32, qsize: u32) -> Self {
Self { nsid, cqid, qsize }
}
}
async fn handle_client(mut stream_tokio: TcpStream, active_queues: &mut HashMap<u16, IoQueue>) -> Result<(), Box<dyn Error>> {
// Read the command header
let mut header = [0u8; 28];
let mut stream = stream_tokio.into_std()?;
stream.read_exact(&mut header)?;
// Parse the command header
let opcode = header[0];
let flags = header[1];
let cid = u32::from_le_bytes(header[4..8].try_into().unwrap());
let nsid = u32::from_le_bytes(header[8..12].try_into().unwrap());
let qid = u16::from_le_bytes(header[14..16].try_into().unwrap());
let sqsize = u16::from_le_bytes(header[16..18].try_into().unwrap());
let cqsize = u16::from_le_bytes(header[18..20].try_into().unwrap());
let maxdata = u32::from_le_bytes(header[20..24].try_into().unwrap());
let assocqid = u16::from_le_bytes(header[24..26].try_into().unwrap());
let reserved = u16::from_le_bytes(header[26..28].try_into().unwrap());
// Handle the command based on its opcode
match opcode {
0x00 => {
// Create Connection command
if active_queues.contains_key(&qid) {
// Queue already exists
let response = [0x00, 0x80, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00];
stream.write_all(&response)?;
} else {
// Create new I/O queue
let queue = IoQueue::new(nsid, cid, sqsize as u32);
active_queues.insert(qid, queue);
let response = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
stream.write_all(&response)?;
}
}
_ => {
// Unsupported command
let response = [0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
stream.write_all(&response)?;
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Listen for incoming connections
let listener = TcpListener::bind("127.0.0.1:8420").await?;
println!("Listening on {}", listener.local_addr()?);
// Keep track of active I/O queues
let mut active_queues = HashMap::new();
// Accept and handle incoming connections
loop {
let (stream, _) = listener.accept().await?;
println!("Accepted connection from {}", stream.peer_addr()?);
if let Err(e) = handle_client(stream, &mut active_queues).await {
println!("Error: {:?}", e);
}
}
}
Example 2
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
const NVME_TCP_PORT: u16 = 4420;
#[derive(Debug)]
enum Opcode {
Connect,
CreateQueue,
Invalid,
}
#[derive(Debug)]
enum NvmeofCommand {
Connect {
hostnqn: String,
subnqn: String,
},
CreateQueue {
qid: u16,
qsize: u16,
cqsize: u16,
prio: u8,
},
Invalid,
}
fn parse_opcode(opcode: u8) -> Opcode {
match opcode {
0x00 => Opcode::Connect,
0x01 => Opcode::CreateQueue,
_ => Opcode::Invalid,
}
}
fn parse_nvmeof_command(opcode: u8, buf: &[u8]) -> NvmeofCommand {
match opcode {
0x00 => {
let hostnqn = String::from_utf8_lossy(&buf[..buf.len() - 1]).to_string();
let subnqn = String::from_utf8_lossy(&buf[hostnqn.len() + 1..]).to_string();
NvmeofCommand::Connect { hostnqn, subnqn }
}
0x01 => {
let qid = u16::from_be_bytes([buf[0], buf[1]]);
let qsize = u16::from_be_bytes([buf[2], buf[3]]);
let cqsize = u16::from_be_bytes([buf[4], buf[5]]);
let prio = buf[6];
NvmeofCommand::CreateQueue { qid, qsize, cqsize, prio }
}
_ => NvmeofCommand::Invalid,
}
}
fn handle_nvmeof_command(stream: &mut TcpStream, opcode: u8, length: u32) {
let mut buf = vec![0u8; length as usize];
stream.read_exact(&mut buf).unwrap();
let cmd = parse_nvmeof_command(opcode, &buf);
match cmd {
NvmeofCommand::Connect { hostnqn, subnqn } => {
println!("Received Connect Command:");
println!("Host NQN: {}", hostnqn);
println!("Subsystem NQN: {}", subnqn);
}
NvmeofCommand::CreateQueue { qid, qsize, cqsize, prio } => {
println!("Received Create Queue Command:");
println!("Queue ID: {}", qid);
println!("Queue Size: {}", qsize);
println!("Completion Queue Size: {}", cqsize);
println!("Priority: {}", prio);
}
NvmeofCommand::Invalid => {
println!("Invalid Command Received");
}
}
}
fn main() {
let listener = TcpListener::bind(("127.0.0.1", NVME_TCP_PORT)).unwrap();
println!("NVMe over Fabrics Target Listening on port {}", NVME_TCP_PORT);
for stream in listener.incoming() {
match stream {
Ok(mut stream) => {
let mut header = [0u8; 16];
stream.read_exact(&mut header).unwrap();
let opcode = header[0];
let _flags = header[1];
let length = u32::from_be_bytes([
header[4], header[5], header[6], header[7]
]);
handle_nvmeof_command(&mut stream, opcode, length);
},
Err(e) => {
// do something
},
}
}
}