From c2e6f71f01badce5aa87f749bfd5f7791cc6cf0e Mon Sep 17 00:00:00 2001 From: Zhennan Tu <17048267+numbaa@users.noreply.github.com> Date: Sun, 8 Oct 2023 21:57:00 +0800 Subject: [PATCH] Merge branch rtc2 (#35) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 添加编译选项选择tcp,rtc,rtc2 * rtc2 test 1 * rtc2 test 2 * rtc2 test 3 * rtc2 test 4 p2p✅ --- .gitignore | 1 + CMakeLists.txt | 12 ++- README.md | 2 +- app/src/main.cpp | 13 ++- lanthing/CMakeLists.txt | 1 + lanthing/src/client/client.cpp | 96 +++++++++++++------ lanthing/src/client/client.h | 3 + lanthing/src/main.cpp | 29 ++++-- .../src/service/workers/worker_session.cpp | 90 ++++++++++++----- lanthing/src/service/workers/worker_session.h | 6 +- lanthing/src/worker/worker.cpp | 2 +- options-default.cmake | 3 +- .../rtc2/src/connection/connection_impl.cpp | 18 ++-- .../rtc2/src/connection/connection_impl.h | 1 + .../rtc2/src/modules/dtls/key_and_cert.cpp | 6 +- transport/rtc2/src/modules/dtls/mbed_dtls.cpp | 9 +- .../rtc2/src/modules/network/address.cpp | 20 ++-- transport/rtc2/src/modules/network/address.h | 2 + .../src/modules/network/network_channel.cpp | 31 ++++++ .../rtc2/src/modules/network/udp_socket.cpp | 21 +++- transport/rtc2/src/modules/p2p/endpoint.cpp | 39 ++++++-- transport/rtc2/src/modules/p2p/endpoint.h | 1 + .../rtc2/src/modules/p2p/lan_endpoint.cpp | 17 ++-- transport/rtc2/src/modules/p2p/lan_endpoint.h | 2 +- transport/rtc2/src/modules/p2p/netcard.cpp | 9 +- transport/rtc2/src/modules/p2p/p2p.cpp | 1 + .../rtc2/src/modules/p2p/stuns/easy_stun.cpp | 4 + .../rtc2/src/modules/p2p/stuns/easy_stun.h | 2 + .../rtc2/src/modules/p2p/stuns/message.h | 6 ++ transport/rtc2/src/rtc2.cpp | 10 +- 30 files changed, 336 insertions(+), 121 deletions(-) diff --git a/.gitignore b/.gitignore index a3e256f..1e0c08f 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,7 @@ /.vscode /.vs /build +/install CMakeSettings.json options-user.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index 0ab9b75..fc277eb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,9 @@ project(lanthing-pc) list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeRC.cmake) +set(LT_TRANSPORT_RTC 1) +set(LT_TRANSPORT_RTC2 2) +set(LT_TRANSPORT_TCP 3) if (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/options-user.cmake) include(${CMAKE_CURRENT_SOURCE_DIR}/options-user.cmake) else() @@ -55,8 +58,13 @@ add_definitions(-DLT_SERVER_USE_SSL=$,true,false add_definitions(-DLT_RUN_AS_SERVICE=$,true,false>) add_definitions(-DLT_WIN_SERVICE_NAME=${LT_WIN_SERVICE_NAME}) add_definitions(-DLT_WIN_SERVICE_DISPLAY_NAME=${LT_WIN_SERVICE_DISPLAY_NAME}) -# -add_definitions(-DLT_USE_LTRTC=$,true,false>) +#transport +add_definitions(-DLT_TRANSPORT_RTC=${LT_TRANSPORT_RTC}) +add_definitions(-DLT_TRANSPORT_RTC2=${LT_TRANSPORT_RTC2}) +add_definitions(-DLT_TRANSPORT_TCP=${LT_TRANSPORT_TCP}) +add_definitions(-DLT_TRANSPORT_TYPE=${LT_TRANSPORT_TYPE}) +#misc +add_definitions(-DLT_CRASH_ON_THREAD_HANGS=$,true,false>) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/certs) diff --git a/README.md b/README.md index bea2993..b54b611 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,6 @@ $> ./build.ps1 build [Debug|Release] ``` 或者: ``` -$> cmake -B build/[Debug|Release] --DCMAKE_BUILD_TYPE=[Debug|Release] -DCMAKE_INSTALL_PREFIX=install/[Debug|Release] +$> cmake -B build/[Debug|Release] -DCMAKE_BUILD_TYPE=[Debug|Release] -DCMAKE_INSTALL_PREFIX=install/[Debug|Release] $> cmake --build build/[Debug|Release] --config [Debug|Release] --target install ``` diff --git a/app/src/main.cpp b/app/src/main.cpp index c228058..3e4674f 100644 --- a/app/src/main.cpp +++ b/app/src/main.cpp @@ -62,6 +62,10 @@ void sigint_handler(int) { std::terminate(); } +void terminateCallback(const std::string& last_word) { + LOG(INFO) << "Last words: " << last_word; +} + void initLogging() { std::string bin_path = ltlib::getProgramFullpath(); std::string bin_dir = ltlib::getProgramPath(); @@ -88,13 +92,18 @@ void initLogging() { g3::only_change_at_initialization::addLogLevel(ERR); g3::log_levels::disable(DEBUG); g3::initializeLogging(g_logWorker.get()); - ltlib::ThreadWatcher::instance()->registerTerminateCallback( - [](const std::string& last_word) { LOG(INFO) << "Last words: " << last_word; }); LOG(INFO) << "Log system initialized"; g_minidumpGenertator = std::make_unique(log_dir.string()); signal(SIGINT, sigint_handler); + if (LT_CRASH_ON_THREAD_HANGS) { + ltlib::ThreadWatcher::instance()->enableCrashOnTimeout(); + ltlib::ThreadWatcher::instance()->registerTerminateCallback(terminateCallback); + } + else { + ltlib::ThreadWatcher::instance()->disableCrashOnTimeout(); + } } } // namespace diff --git a/lanthing/CMakeLists.txt b/lanthing/CMakeLists.txt index c37aa7a..ac7b4fa 100644 --- a/lanthing/CMakeLists.txt +++ b/lanthing/CMakeLists.txt @@ -182,6 +182,7 @@ target_link_libraries(${PROJECT_NAME} ltlib ltproto rtc + nbp2p transport_api transport #fonts diff --git a/lanthing/src/client/client.cpp b/lanthing/src/client/client.cpp index a241811..728f83e 100644 --- a/lanthing/src/client/client.cpp +++ b/lanthing/src/client/client.cpp @@ -49,11 +49,9 @@ #include #include -#if LT_USE_LTRTC -#include -#else // LT_USE_LTRTC +#include +#include #include -#endif // LT_USE_LTRTC namespace { @@ -74,11 +72,17 @@ lt::VideoCodecType to_ltrtc(std::string codec_str) { } lt::AudioCodecType atype() { -#if LT_USE_LTRTC - return lt::AudioCodecType::PCM; -#else - return lt::AudioCodecType::OPUS; -#endif + switch (LT_TRANSPORT_TYPE) { + case LT_TRANSPORT_RTC: + return lt::AudioCodecType::PCM; + case LT_TRANSPORT_RTC2: + return lt::AudioCodecType::PCM; + case LT_TRANSPORT_TCP: + return lt::AudioCodecType::OPUS; + default: + LOG(FATAL) << "Unknown transport type"; + return lt::AudioCodecType::OPUS; + } } } // namespace @@ -356,6 +360,8 @@ void Client::onSignalingMessage(std::shared_ptr _ case ltproto::signaling::SignalingMessage::Rtc: { auto& rtc_msg = msg->rtc_message(); + LOG(INFO) << "Received signaling key:" << msg->rtc_message().key().c_str() + << ", value:" << msg->rtc_message().value().c_str(); tp_client_->onSignalingMessage(rtc_msg.key().c_str(), rtc_msg.value().c_str()); break; } @@ -380,8 +386,47 @@ void Client::onSignalingMessageAck(std::shared_ptrconnect()) { + LOG(INFO) << "lt::tp::Client connect failed"; + return false; + } + return true; +} + +std::unique_ptr Client::createTcpClient() { + namespace ph = std::placeholders; + lt::tp::ClientTCP::Params params{}; + params.on_data = std::bind(&Client::onTpData, this, ph::_1, ph::_2, ph::_3); + params.on_video = std::bind(&Client::onTpVideoFrame, this, ph::_1); + params.on_audio = std::bind(&Client::onTpAudioData, this, ph::_1); + params.on_connected = std::bind(&Client::onTpConnected, this); + params.on_failed = std::bind(&Client::onTpFailed, this); + params.on_disconnected = std::bind(&Client::onTpDisconnected, this); + params.on_signaling_message = std::bind(&Client::onTpSignalingMessage, this, ph::_1, ph::_2); + params.video_codec_type = video_params_.codec_type; + return lt::tp::ClientTCP::create(params); +} + +std::unique_ptr Client::createRtcClient() { namespace ph = std::placeholders; -#if LT_USE_LTRTC rtc::Client::Params params{}; params.use_nbp2p = true; std::vector reflex_servers; @@ -413,29 +458,26 @@ bool Client::initTransport() { params.video_codec_type = video_params_.codec_type; params.audio_channels = audio_params_.channels; params.audio_sample_rate = audio_params_.frames_per_second; - tp_client_ = rtc::Client::create(std::move(params)); -#else // LT_USE_LTRTC - lt::tp::ClientTCP::Params params{}; + return rtc::Client::create(std::move(params)); +} + +std::unique_ptr Client::createRtc2Client() { + namespace ph = std::placeholders; + rtc2::Client::Params params{}; params.on_data = std::bind(&Client::onTpData, this, ph::_1, ph::_2, ph::_3); params.on_video = std::bind(&Client::onTpVideoFrame, this, ph::_1); params.on_audio = std::bind(&Client::onTpAudioData, this, ph::_1); params.on_connected = std::bind(&Client::onTpConnected, this); + params.on_conn_changed = std::bind(&Client::onTpConnChanged, this); params.on_failed = std::bind(&Client::onTpFailed, this); params.on_disconnected = std::bind(&Client::onTpDisconnected, this); - params.onSignalingMessage = std::bind(&Client::onTpSignalingMessage, this, ph::_1, ph::_2); - params.video_codec_type = video_params_.codec_type; - tp_client_ = lt::tp::ClientTCP::create(params); -#endif // LT_USE_LTRTC - - if (tp_client_ == nullptr) { - LOG(INFO) << "Create rtc client failed"; - return false; - } - if (!tp_client_->connect()) { - LOG(INFO) << "LTClient connect failed"; - return false; - } - return true; + params.on_signaling_message = std::bind(&Client::onTpSignalingMessage, this, ph::_1, ph::_2); + params.audio_recv_ssrc = 687154681; + params.video_recv_ssrc = 541651314; + // TODO: key and cert合理的创建时机 + params.key_and_cert = rtc2::KeyAndCert::create(); + params.remote_digest; + return rtc2::Client::create(params); } void Client::onTpData(const uint8_t* data, uint32_t size, bool is_reliable) { diff --git a/lanthing/src/client/client.h b/lanthing/src/client/client.h index cd0ea0b..cea6ef3 100644 --- a/lanthing/src/client/client.h +++ b/lanthing/src/client/client.h @@ -113,6 +113,9 @@ private: // transport bool initTransport(); + std::unique_ptr createTcpClient(); + std::unique_ptr createRtcClient(); + std::unique_ptr createRtc2Client(); void onTpData(const uint8_t* data, uint32_t size, bool is_reliable); void onTpVideoFrame(const lt::VideoFrame& frame); void onTpAudioData(const lt::AudioData& audio_data); diff --git a/lanthing/src/main.cpp b/lanthing/src/main.cpp index 3f7e60b..4b56ade 100644 --- a/lanthing/src/main.cpp +++ b/lanthing/src/main.cpp @@ -50,9 +50,9 @@ #include #endif -#if LT_USE_LTRTC +#if LT_TRANSPORT_TYPE == LT_TRANSPORT_RTC #include -#endif // LT_USE_LTRTC +#endif #include "firewall.h" @@ -82,6 +82,10 @@ void sigint_handler(int) { ::exit(0); } +void terminateCallback(const std::string& last_word) { + LOG(INFO) << "Last words: " << last_word; +} + void initLogAndMinidump(Role role) { std::string prefix; std::string rtc_prefix; @@ -122,24 +126,29 @@ void initLogAndMinidump(Role role) { } } g_log_worker = g3::LogWorker::createLogWorker(); - g_log_worker->addSink(std::make_unique(prefix, log_dir.string()), - <lib::LogSink::fileWrite); + g_log_worker->addSink( + std::make_unique(prefix, log_dir.string(), 30 /*flush_every_30_logs*/), + <lib::LogSink::fileWrite); g3::log_levels::disable(DEBUG); g3::only_change_at_initialization::addLogLevel(ERR); g3::initializeLogging(g_log_worker.get()); - ltlib::ThreadWatcher::instance()->registerTerminateCallback( - [](const std::string& last_word) { LOG(INFO) << "Last words: " << last_word; }); if ((role == Role::Service || role == Role::Client) && !rtc_prefix.empty()) { -#if LT_USE_LTRTC +#if LT_TRANSPORT_TYPE == LT_TRANSPORT_RTC rtc::initLogging(log_dir.string().c_str(), rtc_prefix.c_str()); -#endif // LT_USE_LTRTC +#endif } LOG(INFO) << "Log system initialized"; // g3log必须再minidump前初始化 g_minidump_genertator = std::make_unique(log_dir.string()); signal(SIGINT, sigint_handler); - // ltlib::ThreadWatcher::instance()->disable_crash_on_timeout(); + if (LT_CRASH_ON_THREAD_HANGS) { + ltlib::ThreadWatcher::instance()->enableCrashOnTimeout(); + ltlib::ThreadWatcher::instance()->registerTerminateCallback(terminateCallback); + } + else { + ltlib::ThreadWatcher::instance()->disableCrashOnTimeout(); + } } std::map parseOptions(int argc, char* argv[]) { @@ -244,7 +253,7 @@ int main(int argc, char* argv[]) { } else if (iter->second == "client") { // 方便调试attach - // std::this_thread::sleep_for(std::chrono::seconds { 15 }); + // std::this_thread::sleep_for(std::chrono::seconds{15}); return runAsClient(options); } else if (iter->second == "worker") { diff --git a/lanthing/src/service/workers/worker_session.cpp b/lanthing/src/service/workers/worker_session.cpp index a67bb79..d8aeac5 100644 --- a/lanthing/src/service/workers/worker_session.cpp +++ b/lanthing/src/service/workers/worker_session.cpp @@ -55,11 +55,9 @@ #include #include -#if LT_USE_LTRTC -#include -#else // LT_USE_LTRTC +#include +#include #include -#endif // LT_USE_LTRTC #include "worker_process.h" #include @@ -207,11 +205,51 @@ bool WorkerSession::init(std::shared_ptr _msg) { return true; } -bool WorkerSession::initRtcServer() { +bool WorkerSession::initTransport() { + switch (LT_TRANSPORT_TYPE) { + case LT_TRANSPORT_TCP: + tp_server_ = createTcpServer(); + break; + case LT_TRANSPORT_RTC: + tp_server_ = createRtcServer(); + break; + case LT_TRANSPORT_RTC2: + tp_server_ = createRtc2Server(); + break; + default: + break; + } + if (tp_server_ == nullptr) { + LOG(ERR) << "Create transport server failed"; + return false; + } + else { + rtc_closed_ = false; + return true; + } +} + +std::unique_ptr WorkerSession::createTcpServer() { + namespace ph = std::placeholders; auto negotiated_params = std::static_pointer_cast(negotiated_streaming_params_); + lt::tp::ServerTCP::Params params{}; + params.video_codec_type = ::to_ltrtc( + static_cast(negotiated_params->video_codecs().Get(0))); + params.on_failed = std::bind(&WorkerSession::onTpFailed, this); + params.on_disconnected = std::bind(&WorkerSession::onTpDisconnected, this); + params.on_accepted = std::bind(&WorkerSession::onTpAccepted, this); + params.on_data = std::bind(&WorkerSession::onTpData, this, ph::_1, ph::_2, ph::_3); + params.on_signaling_message = + std::bind(&WorkerSession::onTpSignalingMessage, this, ph::_1, ph::_2); + return lt::tp::ServerTCP::create(params); +} + +std::unique_ptr WorkerSession::createRtcServer() { namespace ph = std::placeholders; -#if LT_USE_LTRTC + auto negotiated_params = + std::static_pointer_cast(negotiated_streaming_params_); + rtc::Server::Params params{}; params.use_nbp2p = true; std::vector reflex_servers; @@ -253,27 +291,31 @@ bool WorkerSession::initRtcServer() { params.on_video_bitrate_update = std::bind(&WorkerSession::onTpEesimatedVideoBitreateUpdate, this, ph::_1); params.on_loss_rate_update = std::bind(&WorkerSession::onTpLossRateUpdate, this, ph::_1); - tp_server_ = rtc::Server::create(std::move(params)); -#else // LT_USE_LTRTC - lt::tp::ServerTCP::Params params{}; - params.video_codec_type = ::to_ltrtc(negotiated_params->video_codecs().Get(0)); + return rtc::Server::create(std::move(params)); +} + +std::unique_ptr WorkerSession::createRtc2Server() { + namespace ph = std::placeholders; + rtc2::Server::Params params{}; + params.on_accepted = std::bind(&WorkerSession::onTpAccepted, this); params.on_failed = std::bind(&WorkerSession::onTpFailed, this); params.on_disconnected = std::bind(&WorkerSession::onTpDisconnected, this); - params.on_accepted = std::bind(&WorkerSession::onTpAccepted, this); + params.on_conn_changed = std::bind(&WorkerSession::onTpConnChanged, this); params.on_data = std::bind(&WorkerSession::onTpData, this, ph::_1, ph::_2, ph::_3); - params.onSignalingMessage = + params.on_signaling_message = std::bind(&WorkerSession::onTpSignalingMessage, this, ph::_1, ph::_2); - tp_server_ = lt::tp::ServerTCP::create(params); -#endif // LT_USE_LTRTC - - if (tp_server_ == nullptr) { - LOG(ERR) << "Create rtc server failed"; - return false; - } - else { - rtc_closed_ = false; - return true; + params.on_keyframe_request = std::bind(&WorkerSession::onTpRequestKeyframe, this); + params.on_video_bitrate_update = + std::bind(&WorkerSession::onTpEesimatedVideoBitreateUpdate, this, ph::_1); + params.on_loss_rate_update = std::bind(&WorkerSession::onTpLossRateUpdate, this, ph::_1); + params.remote_digest; + params.key_and_cert = rtc2::KeyAndCert::create(); + if (params.key_and_cert == nullptr) { + return nullptr; } + params.video_send_ssrc = 541651314; + params.audio_send_ssrc = 687154681; + return rtc2::Server::create(params); } void WorkerSession::mainLoop(const std::function& i_am_alive) { @@ -337,7 +379,7 @@ void WorkerSession::maybeOnCreateSessionCompleted() { if (negotiated_streaming_params_ == nullptr) { return; } - if (!initRtcServer()) { + if (!initTransport()) { on_create_session_completed_(false, session_name_, empty_params); return; } @@ -456,6 +498,8 @@ void WorkerSession::onSignalingMessageAck(std::shared_ptr _msg) { auto msg = std::static_pointer_cast(_msg); + LOG(INFO) << "Received signaling key:" << msg->rtc_message().key().c_str() + << ", value:" << msg->rtc_message().value().c_str(); tp_server_->onSignalingMessage(msg->rtc_message().key().c_str(), msg->rtc_message().value().c_str()); } diff --git a/lanthing/src/service/workers/worker_session.h b/lanthing/src/service/workers/worker_session.h index 583f8db..6304f9c 100644 --- a/lanthing/src/service/workers/worker_session.h +++ b/lanthing/src/service/workers/worker_session.h @@ -79,14 +79,16 @@ private: on_create_completed, std::function on_closed); bool init(std::shared_ptr msg); - bool initRtcServer(); + bool initTransport(); + std::unique_ptr createTcpServer(); + std::unique_ptr createRtcServer(); + std::unique_ptr createRtc2Server(); void createWorkerProcess(uint32_t client_width, uint32_t client_height, uint32_t client_refresh_rate, std::vector client_codecs); void mainLoop(const std::function& i_am_alive); void onClosed(CloseReason reason); void maybeOnCreateSessionCompleted(); - bool createVideoEncoder(); void postTask(const std::function& task); void postDelayTask(int64_t delay_ms, const std::function& task); diff --git a/lanthing/src/worker/worker.cpp b/lanthing/src/worker/worker.cpp index e4e97c1..101f9b0 100644 --- a/lanthing/src/worker/worker.cpp +++ b/lanthing/src/worker/worker.cpp @@ -213,7 +213,6 @@ bool Worker::init() { } } - ioloop_->postDelay(500 /*ms*/, std::bind(&Worker::checkCimeout, this)); std::promise promise; auto future = promise.get_future(); thread_ = ltlib::BlockingThread::create( @@ -222,6 +221,7 @@ bool Worker::init() { mainLoop(i_am_alive); }); future.get(); + ioloop_->postDelay(500 /*ms*/, std::bind(&Worker::checkCimeout, this)); return true; } // namespace lt diff --git a/options-default.cmake b/options-default.cmake index ac2c1ad..ce5d74a 100644 --- a/options-default.cmake +++ b/options-default.cmake @@ -9,4 +9,5 @@ set(LT_SERVER_USE_SSL OFF) set(LT_RUN_AS_SERVICE OFF) set(LT_WIN_SERVICE_NAME Lanthing) set(LT_WIN_SERVICE_DISPLAY_NAME "Lanthing Service") -set(LT_USE_LTRTC ON) \ No newline at end of file +set(LT_TRANSPORT_TYPE ${LT_TRANSPORT_RTC}) +set(LT_CRASH_ON_THREAD_HANGS ON) \ No newline at end of file diff --git a/transport/rtc2/src/connection/connection_impl.cpp b/transport/rtc2/src/connection/connection_impl.cpp index 2129c29..2e192ab 100644 --- a/transport/rtc2/src/connection/connection_impl.cpp +++ b/transport/rtc2/src/connection/connection_impl.cpp @@ -166,6 +166,7 @@ bool ConnectionImpl::init() { // message channel MessageChannel::Params msg_param{}; + msg_param.network_channel = network_channel_.get(); msg_param.reliable_ssrc = 0; msg_param.half_reliable_ssrc = 0; msg_param.mtu = 1400; @@ -191,15 +192,12 @@ bool ConnectionImpl::init() { if (dtls_ == nullptr) { return false; } - return true; } void ConnectionImpl::start() { + started_ = true; network_channel_->start(); - // TODO: 改成连接成功建立后post - send_thread_->post_delay(ltlib::TimeDelta{1000} /*us*/, - std::bind(&Pacer::process, pacer_.get(), pacer_->weak_from_this())); } bool ConnectionImpl::sendData(const uint8_t* data, uint32_t size) { @@ -231,7 +229,7 @@ void ConnectionImpl::onSignalingMessage(const std::string& key, const std::strin LOG(ERR) << "Received unknown signaling message key:" << key; return; } - std::istringstream iss; + std::istringstream iss(value); std::string key1, key2, type, addr; iss >> key1; iss >> type; @@ -252,6 +250,10 @@ void ConnectionImpl::onSignalingMessage(const std::string& key, const std::strin LOG(ERR) << "Invalid address " << addr; return; } + if (params_.is_server && !started_) { + started_ = true; + network_channel_->start(); + } network_channel_->addRemoteInfo(info); } @@ -323,15 +325,13 @@ void ConnectionImpl::onDtlsPacket(const uint8_t* data, uint32_t size, int64_t ti } void ConnectionImpl::onDtlsConnected() { - // LOG(INFO) << ""; + LOG(INFO) << "Connected"; } void ConnectionImpl::onDtlsDisconnected() { - // TODO: 整个断链 + LOG(INFO) << "Disconnected"; } -// 原本让EndpointInfo回调到上层,让上层按照自己的方式做序列化会更好 -// 但具体到这个项目,lanthing和rtc2是一体的,你我之间不必拿刻度尺分太清,怎么方便怎么来 void ConnectionImpl::onEndpointInfo(const EndpointInfo& info) { std::ostringstream oss; oss << FieldType << " " << to_str(info.type) << " " << FieldAddr << " " diff --git a/transport/rtc2/src/connection/connection_impl.h b/transport/rtc2/src/connection/connection_impl.h index acfe7a2..fdf6b9b 100644 --- a/transport/rtc2/src/connection/connection_impl.h +++ b/transport/rtc2/src/connection/connection_impl.h @@ -85,6 +85,7 @@ private: std::vector> audio_receive_streams_; std::shared_ptr message_channel_; std::shared_ptr dtls_; + std::atomic started_ = false; }; } // namespace rtc2 \ No newline at end of file diff --git a/transport/rtc2/src/modules/dtls/key_and_cert.cpp b/transport/rtc2/src/modules/dtls/key_and_cert.cpp index aeca46d..306a4b1 100644 --- a/transport/rtc2/src/modules/dtls/key_and_cert.cpp +++ b/transport/rtc2/src/modules/dtls/key_and_cert.cpp @@ -89,9 +89,9 @@ bool KeyAndCertImpl::createInternal() { }); mbedtls_entropy_init(entropy.get()); mbedtls_ctr_drbg_init(drbg.get()); - std::unique_ptr seed{new uint8_t[MBEDTLS_ENTROPY_MAX_SEED_SIZE]}; - int ret = mbedtls_ctr_drbg_seed(drbg.get(), mbedtls_entropy_func, entropy.get(), seed.get(), - MBEDTLS_ENTROPY_MAX_SEED_SIZE); + std::unique_ptr seed{new uint8_t[128]}; + int ret = + mbedtls_ctr_drbg_seed(drbg.get(), mbedtls_entropy_func, entropy.get(), seed.get(), 128); if (ret != 0) { LOG(ERR) << "mbedtls_ctr_drbg_seed failed " << ret; return false; diff --git a/transport/rtc2/src/modules/dtls/mbed_dtls.cpp b/transport/rtc2/src/modules/dtls/mbed_dtls.cpp index 1b5b66f..19df079 100644 --- a/transport/rtc2/src/modules/dtls/mbed_dtls.cpp +++ b/transport/rtc2/src/modules/dtls/mbed_dtls.cpp @@ -109,7 +109,7 @@ bool MbedDtls::init() { if (!tls_init_context()) { return false; } - if (tls_init_engine()) { + if (!tls_init_engine()) { return false; } return true; @@ -238,7 +238,7 @@ bool MbedDtls::tls_init_context() { const mbedtls_ssl_ciphersuite_t* c = mbedtls_ssl_ciphersuite_from_string(name.c_str()); if (c != nullptr) { ciphersuites_.push_back(c->MBEDTLS_PRIVATE(id)); - printf("Adding ciphersuite (%#x) %s\n", c->MBEDTLS_PRIVATE(id), name.c_str()); + LOGF(DEBUG, "Adding ciphersuite (%#x) %s\n", c->MBEDTLS_PRIVATE(id), name.c_str()); } } if (ciphersuites_.empty()) { @@ -251,9 +251,8 @@ bool MbedDtls::tls_init_context() { mbedtls_ssl_conf_min_version(&ssl_cfg_, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3); mbedtls_ssl_conf_verify(&ssl_cfg_, &verify_cert, this); - std::unique_ptr seed{new uint8_t[MBEDTLS_ENTROPY_MAX_SEED_SIZE]}; - int ret = mbedtls_ctr_drbg_seed(&drbg_, mbedtls_entropy_func, &entropy_, seed.get(), - MBEDTLS_ENTROPY_MAX_SEED_SIZE); + std::unique_ptr seed{new uint8_t[128]}; + int ret = mbedtls_ctr_drbg_seed(&drbg_, mbedtls_entropy_func, &entropy_, seed.get(), 128); if (ret != 0) { LOG(ERR) << "mbedtls_ctr_drbg_seed failed " << ret; return false; diff --git a/transport/rtc2/src/modules/network/address.cpp b/transport/rtc2/src/modules/network/address.cpp index ec54c9f..3c4fd95 100644 --- a/transport/rtc2/src/modules/network/address.cpp +++ b/transport/rtc2/src/modules/network/address.cpp @@ -285,22 +285,30 @@ sockaddr_storage Address::to_storage() const { } Address Address::from_storage(const sockaddr_storage& storage) { + return from_storage(&storage); +} + +Address Address::from_storage(const sockaddr_storage* storage) { Address addr{}; - if (storage.ss_family == AF_INET) { - const sockaddr_in* in4 = reinterpret_cast(&storage); + if (storage->ss_family == AF_INET) { + const sockaddr_in* in4 = reinterpret_cast(storage); addr.family_ = AF_INET; - addr.port_ = in4->sin_port; + addr.port_ = ntohs(in4->sin_port); addr.ip_.v4 = in4->sin_addr; } - else if (storage.ss_family == AF_INET6) { - const sockaddr_in6* in6 = reinterpret_cast(&storage); + else if (storage->ss_family == AF_INET6) { + const sockaddr_in6* in6 = reinterpret_cast(storage); addr.family_ = AF_INET6; - addr.port_ = in6->sin6_port; + addr.port_ = ntohs(in6->sin6_port); addr.ip_.v6 = in6->sin6_addr; } return addr; } +Address Address::from_sockaddr(const sockaddr* sockaddr) { + return from_storage(reinterpret_cast(sockaddr)); +} + Address Address::from_str(const std::string& str) { if (str.at(0) == '[') { std::string::size_type closebracket = str.rfind(']'); diff --git a/transport/rtc2/src/modules/network/address.h b/transport/rtc2/src/modules/network/address.h index 16ed9f5..98cbb53 100644 --- a/transport/rtc2/src/modules/network/address.h +++ b/transport/rtc2/src/modules/network/address.h @@ -103,6 +103,8 @@ public: sockaddr_storage& to_storage(sockaddr_storage& storage) const; sockaddr_storage to_storage() const; static Address from_storage(const sockaddr_storage& storage); + static Address from_storage(const sockaddr_storage* storage); + static Address from_sockaddr(const sockaddr* sockaddr); static Address from_str(const std::string& str); private: diff --git a/transport/rtc2/src/modules/network/network_channel.cpp b/transport/rtc2/src/modules/network/network_channel.cpp index f16edbf..fd3b47e 100644 --- a/transport/rtc2/src/modules/network/network_channel.cpp +++ b/transport/rtc2/src/modules/network/network_channel.cpp @@ -39,9 +39,35 @@ namespace rtc2 { +// static void printNetworkAdapters() { +// uv_interface_address_t* info; +// int count; +// char buf[512]; +// uv_interface_addresses(&info, &count); +// for (int i = 0; i < count; i++) { +// uv_interface_address_t interface_a = info[i]; +// if (interface_a.address.address4.sin_family == AF_INET) { +// +// uv_ip4_name(&interface_a.address.address4, buf, sizeof(buf)); +// +// printf("IPv4 address: %s\n", buf); +// } +// +// else if (interface_a.address.address4.sin_family == AF_INET6) { +// +// uv_ip6_name(&interface_a.address.address6, buf, sizeof(buf)); +// +// printf("IPv6 address: %s\n", buf); +// } +// LOG(INFO) << interface_a.name << " " << interface_a.is_internal << " " << buf << " " +// << std::string(&interface_a.phys_addr[0], 6).c_str(); +// } +// } + NetworkChannel::NetworkChannel(const Params& p) : on_error_{p.on_error} , on_endpoint_info_gathered_{p.on_endpoint_info_gathered} { + // printNetworkAdapters(); P2P::Params params{}; params.is_server = p.is_server; params.network_channel = this; @@ -96,7 +122,12 @@ void NetworkChannel::setOnConnChanged( on_conn_changed_ = on_conn_changed; } +// 跑在用户线程 | 跑在网络线程 void NetworkChannel::addRemoteInfo(const EndpointInfo& info) { + if (ioloop_->isNotCurrentThread()) { + post(std::bind(&NetworkChannel::addRemoteInfo, this, info)); + return; + } p2p_->add_remote_info(info); } diff --git a/transport/rtc2/src/modules/network/udp_socket.cpp b/transport/rtc2/src/modules/network/udp_socket.cpp index b7aba55..236e922 100644 --- a/transport/rtc2/src/modules/network/udp_socket.cpp +++ b/transport/rtc2/src/modules/network/udp_socket.cpp @@ -69,7 +69,7 @@ std::shared_ptr UDPSocketImpl::create(ltlib::IOLoop* ioloop, return nullptr; } auto storage = bind_addr.to_storage(); - ret = uv_udp_bind(udp, reinterpret_cast(&storage), UV_UDP_REUSEADDR); + ret = uv_udp_bind(udp, reinterpret_cast(&storage), 0); if (ret != 0) { uv_close((uv_handle_t*)udp, [](uv_handle_t* handle) { auto udp = (uv_udp_t*)handle; @@ -78,6 +78,18 @@ std::shared_ptr UDPSocketImpl::create(ltlib::IOLoop* ioloop, LOG(ERR) << "uv_udp_bind failed with " << ret; return nullptr; } + sockaddr_storage local_storage{}; + int socksize = sizeof(local_storage); + ret = uv_udp_getsockname(udp, reinterpret_cast(&local_storage), &socksize); + if (ret != 0) { + uv_close((uv_handle_t*)udp, [](uv_handle_t* handle) { + auto udp = (uv_udp_t*)handle; + delete udp; + }); + LOG(ERR) << "uv_udp_getsockname failed with " << ret; + return nullptr; + } + Address local_addr = Address::from_storage(local_storage); ret = uv_udp_recv_start(udp, UDPSocketImpl::on_alloc_memory, UDPSocketImpl::on_udp_recv); if (ret != 0) { uv_close((uv_handle_t*)udp, [](uv_handle_t* handle) { @@ -90,7 +102,7 @@ std::shared_ptr UDPSocketImpl::create(ltlib::IOLoop* ioloop, auto udp_socket = std::make_unique(); udp->data = udp_socket.get(); udp_socket->udp_ = udp; - udp_socket->bind_addr_ = bind_addr; + udp_socket->bind_addr_ = local_addr; return udp_socket; } @@ -147,15 +159,14 @@ void UDPSocketImpl::on_udp_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* // 在我的使用场景里,这个flags似乎不用处理 (void)flags; auto that = reinterpret_cast(handle->data); - if (nread < 0) { + if (nread <= 0) { that->error_ = static_cast(nread); delete buf->base; // TODO: 通知错误 return; } if (that->on_read_ != nullptr) { - // 整个过程没有复制,IPv4情况下应该不会爆炸吧? - auto address = Address::from_storage(*reinterpret_cast(addr)); + auto address = Address::from_sockaddr(addr); that->on_read_(reinterpret_cast(buf->base), static_cast(nread), address, ltlib::steady_now_us()); } diff --git a/transport/rtc2/src/modules/p2p/endpoint.cpp b/transport/rtc2/src/modules/p2p/endpoint.cpp index 1074c40..b732d44 100644 --- a/transport/rtc2/src/modules/p2p/endpoint.cpp +++ b/transport/rtc2/src/modules/p2p/endpoint.cpp @@ -65,14 +65,6 @@ void Endpoint::post_delayed_task(uint32_t delayed_ms, const std::function&& socket, NetworkChannel* network_channel, std::function on_connected, std::function on_read) @@ -94,22 +86,43 @@ void Endpoint::send_binding_request(const Address& addr) { int ret = sock()->sendmsg({{msg.data(), msg.size()}}, addr); if (ret < 0) { int error = socket_->error(); - LOG(ERR) << "Send binding request to " << remote_.address.to_string() - << " failed with error " << error; + LOG(ERR) << "Send binding request to " << addr.to_string() << " failed with error " + << error; } post_delayed_task(100, std::bind(&Endpoint::send_binding_request, this, addr)); } +void Endpoint::send_binding_response(const Address& addr, const std::vector& id) { + StunMessage msg{StunMessage::Type::BindingResponse, + reinterpret_cast(id.data())}; + int ret = sock()->sendmsg({{msg.data(), msg.size()}}, addr); + if (ret < 0) { + int error = socket_->error(); + LOG(ERR) << "Send binding response to " << addr.to_string() << " failed with error " + << error; + } +} + +void Endpoint::add_remote_info(const EndpointInfo& info) { + if (remote_.type != EndpointType::Unknown) { + return; + } + remote_ = info; + send_binding_request(remote_.address); +} + bool Endpoint::connected() const { return received_request_ && received_response_; } void Endpoint::set_received_request() { received_request_ = true; + maybe_connected(); } void Endpoint::set_received_response() { received_response_ = true; + maybe_connected(); } void Endpoint::set_local_info(const EndpointInfo& info) { @@ -117,6 +130,7 @@ void Endpoint::set_local_info(const EndpointInfo& info) { } void Endpoint::maybe_connected() { + LOG(DEBUG) << "maybe_connected"; if (connected()) { on_connected_(this); } @@ -126,6 +140,7 @@ void Endpoint::on_read(std::weak_ptr weak_this, const uint8_t* data, u const Address& remote_addr, const int64_t& packet_time_us) { auto shared_this = weak_this.lock(); if (shared_this == nullptr) { + LOG(WARNING) << "shared_this == nullptr"; return; } StunMessage msg{reinterpret_cast(data), @@ -145,8 +160,12 @@ void Endpoint::on_read(std::weak_ptr weak_this, const uint8_t* data, u } else { if (connected()) { + LOG(INFO) << "ON_READ_ CONNECTED"; on_read_(this, data, size, packet_time_us); } + else { + LOG(INFO) << "Onread but not connected"; + } } } diff --git a/transport/rtc2/src/modules/p2p/endpoint.h b/transport/rtc2/src/modules/p2p/endpoint.h index 06fd503..60b0d20 100644 --- a/transport/rtc2/src/modules/p2p/endpoint.h +++ b/transport/rtc2/src/modules/p2p/endpoint.h @@ -69,6 +69,7 @@ protected: std::function on_read); UDPSocket* sock(); void send_binding_request(const Address& addr); + void send_binding_response(const Address& addr, const std::vector& id); bool connected() const; void set_received_request(); void set_received_response(); diff --git a/transport/rtc2/src/modules/p2p/lan_endpoint.cpp b/transport/rtc2/src/modules/p2p/lan_endpoint.cpp index 4979927..b4b7de4 100644 --- a/transport/rtc2/src/modules/p2p/lan_endpoint.cpp +++ b/transport/rtc2/src/modules/p2p/lan_endpoint.cpp @@ -30,19 +30,22 @@ #include "lan_endpoint.h" +#include + namespace rtc2 { -std::unique_ptr LanEndpoint::create(const Params& params) { +std::shared_ptr LanEndpoint::create(const Params& params) { auto udp_socket = params.network_channel->createUDPSocket(params.addr); if (udp_socket == nullptr) { return nullptr; } - - std::unique_ptr ep{new LanEndpoint(std::move(udp_socket), params.network_channel, + uint16_t port = udp_socket->port(); + std::shared_ptr ep{new LanEndpoint(std::move(udp_socket), params.network_channel, params.on_connected, params.on_read)}; ep->init(); EndpointInfo info{}; info.address = params.addr; + info.address.set_port(port); info.type = EndpointType::Lan; ep->set_local_info(info); params.on_endpoint_info(info); @@ -66,23 +69,23 @@ void LanEndpoint::on_binding_request(const StunMessage& msg, const Address& remo const int64_t& packet_time_us) { (void)msg; (void)packet_time_us; - // 此处只验证对方IP:Port,不浪费更多资源,因为上面还有一层DTLS验证 - // 后面如果确实想要在此处做进一步验证,可以用传入的p2p_username和p2p_password做HMAC校验 + LOG(INFO) << "on_binding_request"; if (remote_addr != remote_info().address) { return; } set_received_request(); + send_binding_response(remote_addr, msg.id()); } void LanEndpoint::on_binding_response(const StunMessage& msg, const Address& remote_addr, const int64_t& packet_time_us) { (void)msg; (void)packet_time_us; - // 此处只验证对方IP:Port,不浪费更多资源,因为上面还有一层DTLS验证 - // 后面如果确实想要在此处做进一步验证,可以用传入的p2p_username和p2p_password做HMAC校验 + LOG(INFO) << "on_binding_response"; if (remote_addr != remote_info().address) { return; } set_received_response(); } + } // namespace rtc2 \ No newline at end of file diff --git a/transport/rtc2/src/modules/p2p/lan_endpoint.h b/transport/rtc2/src/modules/p2p/lan_endpoint.h index 29bf6d4..b5351e0 100644 --- a/transport/rtc2/src/modules/p2p/lan_endpoint.h +++ b/transport/rtc2/src/modules/p2p/lan_endpoint.h @@ -44,7 +44,7 @@ public: }; public: - static std::unique_ptr create(const Params& params); + static std::shared_ptr create(const Params& params); int32_t send(std::vector> spans) override; EndpointType type() const override; diff --git a/transport/rtc2/src/modules/p2p/netcard.cpp b/transport/rtc2/src/modules/p2p/netcard.cpp index 6542394..16046be 100644 --- a/transport/rtc2/src/modules/p2p/netcard.cpp +++ b/transport/rtc2/src/modules/p2p/netcard.cpp @@ -37,11 +37,12 @@ #else #endif +#include + namespace rtc2 { #if defined(LT_WINDOWS) -// 后面试试libuv,不搞那么多#if Address getNetcardAddress() { Address result{}; ULONG flags = (GAA_FLAG_SKIP_DNS_SERVER | GAA_FLAG_SKIP_ANYCAST | GAA_FLAG_SKIP_MULTICAST | @@ -69,10 +70,8 @@ Address getNetcardAddress() { } PIP_ADAPTER_UNICAST_ADDRESS address = adapters->FirstUnicastAddress; if (address != nullptr) { - sockaddr_in* addr = reinterpret_cast(address->Address.lpSockaddr); - sockaddr_storage storage{}; - memcpy(&storage, addr, sizeof(sockaddr_in)); - result = Address::from_storage(storage); + sockaddr* addr = reinterpret_cast(address->Address.lpSockaddr); + result = Address::from_sockaddr(addr); return result; } adapters = adapters->Next; diff --git a/transport/rtc2/src/modules/p2p/p2p.cpp b/transport/rtc2/src/modules/p2p/p2p.cpp index be46ba2..5f17ba9 100644 --- a/transport/rtc2/src/modules/p2p/p2p.cpp +++ b/transport/rtc2/src/modules/p2p/p2p.cpp @@ -120,6 +120,7 @@ void P2P::post_delayed_task(uint32_t delayed_ms, const std::function& ta } void P2P::create_lan_endpoint() { + LOG(INFO) << "create_lan_endpoint"; Address netcard_addr = getNetcardAddress(); if (netcard_addr.family() == -1) { LOG(WARNING) << "getNetcardAddress failed, no NIC"; diff --git a/transport/rtc2/src/modules/p2p/stuns/easy_stun.cpp b/transport/rtc2/src/modules/p2p/stuns/easy_stun.cpp index b31f96e..45e12f0 100644 --- a/transport/rtc2/src/modules/p2p/stuns/easy_stun.cpp +++ b/transport/rtc2/src/modules/p2p/stuns/easy_stun.cpp @@ -81,6 +81,10 @@ const uint8_t* StunMessage::data() const { return msg_->data(); } +std::vector StunMessage::id() const { + return msg_->id(); +} + uint8_t* StunMessage::data() { return msg_->data(); } diff --git a/transport/rtc2/src/modules/p2p/stuns/easy_stun.h b/transport/rtc2/src/modules/p2p/stuns/easy_stun.h index 6f845aa..d48d4f9 100644 --- a/transport/rtc2/src/modules/p2p/stuns/easy_stun.h +++ b/transport/rtc2/src/modules/p2p/stuns/easy_stun.h @@ -33,6 +33,7 @@ #include #include +#include #include @@ -60,6 +61,7 @@ public: bool verify() const; Type type() const; const uint8_t* data() const; + std::vector id() const; uint8_t* data(); size_t size() const; std::optional
mapped_address() const; diff --git a/transport/rtc2/src/modules/p2p/stuns/message.h b/transport/rtc2/src/modules/p2p/stuns/message.h index 0bf6975..ee848f3 100644 --- a/transport/rtc2/src/modules/p2p/stuns/message.h +++ b/transport/rtc2/src/modules/p2p/stuns/message.h @@ -641,6 +641,12 @@ public: uint8_t* data() { return buffer_.data(); } const uint8_t* data() const { return buffer_.data(); } + std::vector id() const { + std::vector tid(12); + memcpy(tid.data(), hdr()->tsx_id, 12); + return tid; + } + size_t size() const { return stun_msg_len(hdr()); } bool verify() const { return stun_msg_verify(hdr(), capacity()) == 0 ? false : true; } diff --git a/transport/rtc2/src/rtc2.cpp b/transport/rtc2/src/rtc2.cpp index f6adb0d..418e9e2 100644 --- a/transport/rtc2/src/rtc2.cpp +++ b/transport/rtc2/src/rtc2.cpp @@ -76,6 +76,10 @@ std::unique_ptr Client::create(const Params& params) { conn_params.is_server = false; conn_params.key_and_cert = params.key_and_cert; conn_params.remote_digest = params.remote_digest; + conn_params.on_signaling_message = + [cb = params.on_signaling_message](const std::string& key, const std::string& value) { + cb(key.c_str(), value.c_str()); + }; // auto conn = Connection::create(conn_params); if (conn == nullptr) { @@ -132,9 +136,13 @@ std::unique_ptr Server::create(const Params& params) { data_param.on_data = params.on_data; conn_params.data = data_param; // others - conn_params.is_server = false; + conn_params.is_server = true; conn_params.key_and_cert = params.key_and_cert; conn_params.remote_digest = params.remote_digest; + conn_params.on_signaling_message = + [cb = params.on_signaling_message](const std::string& key, const std::string& value) { + cb(key.c_str(), value.c_str()); + }; // auto conn = Connection::create(conn_params); if (conn == nullptr) {