Skip to content

Commit

Permalink
feat: tmp io_uring test
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyoucao577 committed May 6, 2023
1 parent c2cbf1d commit 035bab6
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 0 deletions.
3 changes: 3 additions & 0 deletions misc/asyncio/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

build
.cache
13 changes: 13 additions & 0 deletions misc/asyncio/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

cmake_minimum_required(VERSION 3.22)

project(asyncio)

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_STANDARD 20)

find_package(PkgConfig REQUIRED)


add_subdirectory(io_uring_echo_server)

11 changes: 11 additions & 0 deletions misc/asyncio/io_uring_echo_server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

project(io_uring_echo_server)

set(IO_URING_ECHO_SERVER_TARGET_NAME echo_server)

aux_source_directory(. SRCS)
add_executable(${IO_URING_ECHO_SERVER_TARGET_NAME} ${SRCS})

# import liburing
pkg_check_modules(LIBURING REQUIRED IMPORTED_TARGET liburing)
target_link_libraries(${IO_URING_ECHO_SERVER_TARGET_NAME} PRIVATE PkgConfig::LIBURING)
40 changes: 40 additions & 0 deletions misc/asyncio/io_uring_echo_server/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

#include "tcp_server.h"
#include <cassert>
#include <iostream>

int main() {

auto tcpServer = TcpServer(8888);
tcpServer.StartListen();

while (true) {
struct io_uring_cqe *cqe{nullptr};
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret != 0) {
std::cout << "io_uring_wait_cqe failed, err " << ret << std::endl;
break;
}

auto req = static_cast<IORequest *>(io_uring_cqe_get_data(cqe));

if (cqe->res < 0) {
// handle error
std::cout << "cqe result err " << cqe->res << std::endl;
if (req) {
delete req;
}
continue;
}

// handle response

if (req) {
delete req;
}

io_uring_cqe_seen(&ring, cqe);
}

return -1;
}
63 changes: 63 additions & 0 deletions misc/asyncio/io_uring_echo_server/tcp_server.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@

#include "tcp_server.h"
#include <liburing.h>

int TcpServer::StartListen() noexcept {
auto ret = 0;

int reuse = 1;
ret =
setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(int));
if (ret != 0) {
std::cout << "setsockopt SO_REUSEADDR failed, err " << ret << std::endl;
return ret;
}

struct sockaddr_in listen_addr;
listen_addr.sin_family = AF_INET;
listen_addr.sin_port = htons(port_);
listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);

ret = bind(sock_, (struct sockaddr *)&listen_addr, sizeof(listen_addr));
if (ret != 0) {
std::cout << "bind failed, err " << ret << std::endl;
return ret;
}

ret = listen(sock_, 100);
if (ret != 0) {
std::cout << "listen failed, err " << ret << std::endl;
return ret;
}

ret = io_uring_queue_init(100, &ring_, 0);
if (ret != 0) {
std::cout << "io_uring_queue_init failed, err " << ret << std::endl;
return ret;
}

return ret;
}

void TcpServer::AddAcceptRequest() {

struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_);
struct sockaddr_in peer_addr;
socklen_t peer_addr_len = sizeof(peer_addr);
io_uring_prep_accept(sqe, sock_, (struct sockaddr *)&peer_addr,
&peer_addr_len, 0);

auto req = IORequest{.fields = {.type = RequestType::Accept}};
io_uring_sqe_set_data(sqe, (void *)req.raw);
}

void TcpServer::AddReadRequest() {

struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_);

io_uring_prep_read(sqe, sock_, (struct sockaddr *)&peer_addr, &peer_addr_len,
0);

auto req = IORequest{.fields = {.type = RequestType::Accept}};
io_uring_sqe_set_data(sqe, (void *)req.raw);
}
56 changes: 56 additions & 0 deletions misc/asyncio/io_uring_echo_server/tcp_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

#pragma once

#include <arpa/inet.h>
#include <asm-generic/int-ll64.h>
#include <iostream>
#include <liburing.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>

enum class RequestType : unsigned char {
Accept = 0,
Read,
Write,
Close,
};

union IORequest { // align to 64 bits, the tricky to avoid memory allocation
struct {
int fd; // 4 bytes
RequestType type; // 1 byte
int buff_id : 24; // 3 bytes
} fields;
__u64 raw;
};
static_assert(sizeof(IORequest) == sizeof(__u64),
"sizeof(IORequest) == sizeof(__u64) failed");

class TcpServer {
public:
TcpServer(unsigned short port) : port_(port) {
sock_ = socket(AF_INET, SOCK_STREAM, 0);
}
TcpServer(const TcpServer &) = delete;
TcpServer(TcpServer &&) = delete;
~TcpServer() {
close(sock_);
io_uring_queue_exit(&ring_);
}

int sock() const noexcept { return sock_; }

int StartListen() noexcept;

// io_uring
void AddAcceptRequest();
void AddReadRequest();
void AddWriteRequest();
int Submit();

private:
unsigned short port_{0};
int sock_{-1};
struct io_uring ring_;
};

0 comments on commit 035bab6

Please sign in to comment.