Merge branch rtc2 (#35)

* 添加编译选项选择tcp,rtc,rtc2

* rtc2 test 1

* rtc2 test 2

* rtc2 test 3

* rtc2 test 4 p2p
This commit is contained in:
Zhennan Tu
2023-10-08 21:57:00 +08:00
committed by GitHub
parent f331353fa2
commit c2e6f71f01
30 changed files with 336 additions and 121 deletions

1
.gitignore vendored
View File

@@ -36,6 +36,7 @@
/.vscode
/.vs
/build
/install
CMakeSettings.json
options-user.cmake

View File

@@ -5,6 +5,9 @@ project(lanthing-pc)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeRC.cmake)
set(LT_TRANSPORT_RTC 1)
set(LT_TRANSPORT_RTC2 2)
set(LT_TRANSPORT_TCP 3)
if (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/options-user.cmake)
include(${CMAKE_CURRENT_SOURCE_DIR}/options-user.cmake)
else()
@@ -55,8 +58,13 @@ add_definitions(-DLT_SERVER_USE_SSL=$<IF:$<BOOL:${LT_SERVER_USE_SSL}>,true,false
add_definitions(-DLT_RUN_AS_SERVICE=$<IF:$<BOOL:${LT_RUN_AS_SERVICE}>,true,false>)
add_definitions(-DLT_WIN_SERVICE_NAME=${LT_WIN_SERVICE_NAME})
add_definitions(-DLT_WIN_SERVICE_DISPLAY_NAME=${LT_WIN_SERVICE_DISPLAY_NAME})
#
add_definitions(-DLT_USE_LTRTC=$<IF:$<BOOL:${LT_USE_LTRTC}>,true,false>)
#transport
add_definitions(-DLT_TRANSPORT_RTC=${LT_TRANSPORT_RTC})
add_definitions(-DLT_TRANSPORT_RTC2=${LT_TRANSPORT_RTC2})
add_definitions(-DLT_TRANSPORT_TCP=${LT_TRANSPORT_TCP})
add_definitions(-DLT_TRANSPORT_TYPE=${LT_TRANSPORT_TYPE})
#misc
add_definitions(-DLT_CRASH_ON_THREAD_HANGS=$<IF:$<BOOL:${LT_CRASH_ON_THREAD_HANGS}>,true,false>)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/certs)

View File

@@ -9,6 +9,6 @@ $> ./build.ps1 build [Debug|Release]
```
或者:
```
$> cmake -B build/[Debug|Release] --DCMAKE_BUILD_TYPE=[Debug|Release] -DCMAKE_INSTALL_PREFIX=install/[Debug|Release]
$> cmake -B build/[Debug|Release] -DCMAKE_BUILD_TYPE=[Debug|Release] -DCMAKE_INSTALL_PREFIX=install/[Debug|Release]
$> cmake --build build/[Debug|Release] --config [Debug|Release] --target install
```

View File

@@ -62,6 +62,10 @@ void sigint_handler(int) {
std::terminate();
}
void terminateCallback(const std::string& last_word) {
LOG(INFO) << "Last words: " << last_word;
}
void initLogging() {
std::string bin_path = ltlib::getProgramFullpath<char>();
std::string bin_dir = ltlib::getProgramPath<char>();
@@ -88,13 +92,18 @@ void initLogging() {
g3::only_change_at_initialization::addLogLevel(ERR);
g3::log_levels::disable(DEBUG);
g3::initializeLogging(g_logWorker.get());
ltlib::ThreadWatcher::instance()->registerTerminateCallback(
[](const std::string& last_word) { LOG(INFO) << "Last words: " << last_word; });
LOG(INFO) << "Log system initialized";
g_minidumpGenertator = std::make_unique<LTMinidumpGenerator>(log_dir.string());
signal(SIGINT, sigint_handler);
if (LT_CRASH_ON_THREAD_HANGS) {
ltlib::ThreadWatcher::instance()->enableCrashOnTimeout();
ltlib::ThreadWatcher::instance()->registerTerminateCallback(terminateCallback);
}
else {
ltlib::ThreadWatcher::instance()->disableCrashOnTimeout();
}
}
} // namespace

View File

@@ -182,6 +182,7 @@ target_link_libraries(${PROJECT_NAME}
ltlib
ltproto
rtc
nbp2p
transport_api
transport
#fonts

View File

@@ -49,11 +49,9 @@
#include <ltlib/time_sync.h>
#include <string_keys.h>
#if LT_USE_LTRTC
#include <rtc/rtc.h>
#else // LT_USE_LTRTC
#include <transport/transport_rtc.h>
#include <transport/transport_rtc2.h>
#include <transport/transport_tcp.h>
#endif // LT_USE_LTRTC
namespace {
@@ -74,11 +72,17 @@ lt::VideoCodecType to_ltrtc(std::string codec_str) {
}
lt::AudioCodecType atype() {
#if LT_USE_LTRTC
return lt::AudioCodecType::PCM;
#else
return lt::AudioCodecType::OPUS;
#endif
switch (LT_TRANSPORT_TYPE) {
case LT_TRANSPORT_RTC:
return lt::AudioCodecType::PCM;
case LT_TRANSPORT_RTC2:
return lt::AudioCodecType::PCM;
case LT_TRANSPORT_TCP:
return lt::AudioCodecType::OPUS;
default:
LOG(FATAL) << "Unknown transport type";
return lt::AudioCodecType::OPUS;
}
}
} // namespace
@@ -356,6 +360,8 @@ void Client::onSignalingMessage(std::shared_ptr<google::protobuf::MessageLite> _
case ltproto::signaling::SignalingMessage::Rtc:
{
auto& rtc_msg = msg->rtc_message();
LOG(INFO) << "Received signaling key:" << msg->rtc_message().key().c_str()
<< ", value:" << msg->rtc_message().value().c_str();
tp_client_->onSignalingMessage(rtc_msg.key().c_str(), rtc_msg.value().c_str());
break;
}
@@ -380,8 +386,47 @@ void Client::onSignalingMessageAck(std::shared_ptr<google::protobuf::MessageLite
}
bool Client::initTransport() {
switch (LT_TRANSPORT_TYPE) {
case LT_TRANSPORT_TCP:
tp_client_ = createTcpClient();
break;
case LT_TRANSPORT_RTC:
tp_client_ = createRtcClient();
break;
case LT_TRANSPORT_RTC2:
tp_client_ = createRtc2Client();
break;
default:
break;
}
if (tp_client_ == nullptr) {
LOG(ERR) << "Create lt::tp::Client failed";
return false;
}
if (!tp_client_->connect()) {
LOG(INFO) << "lt::tp::Client connect failed";
return false;
}
return true;
}
std::unique_ptr<tp::Client> Client::createTcpClient() {
namespace ph = std::placeholders;
lt::tp::ClientTCP::Params params{};
params.on_data = std::bind(&Client::onTpData, this, ph::_1, ph::_2, ph::_3);
params.on_video = std::bind(&Client::onTpVideoFrame, this, ph::_1);
params.on_audio = std::bind(&Client::onTpAudioData, this, ph::_1);
params.on_connected = std::bind(&Client::onTpConnected, this);
params.on_failed = std::bind(&Client::onTpFailed, this);
params.on_disconnected = std::bind(&Client::onTpDisconnected, this);
params.on_signaling_message = std::bind(&Client::onTpSignalingMessage, this, ph::_1, ph::_2);
params.video_codec_type = video_params_.codec_type;
return lt::tp::ClientTCP::create(params);
}
std::unique_ptr<tp::Client> Client::createRtcClient() {
namespace ph = std::placeholders;
#if LT_USE_LTRTC
rtc::Client::Params params{};
params.use_nbp2p = true;
std::vector<const char*> reflex_servers;
@@ -413,29 +458,26 @@ bool Client::initTransport() {
params.video_codec_type = video_params_.codec_type;
params.audio_channels = audio_params_.channels;
params.audio_sample_rate = audio_params_.frames_per_second;
tp_client_ = rtc::Client::create(std::move(params));
#else // LT_USE_LTRTC
lt::tp::ClientTCP::Params params{};
return rtc::Client::create(std::move(params));
}
std::unique_ptr<tp::Client> Client::createRtc2Client() {
namespace ph = std::placeholders;
rtc2::Client::Params params{};
params.on_data = std::bind(&Client::onTpData, this, ph::_1, ph::_2, ph::_3);
params.on_video = std::bind(&Client::onTpVideoFrame, this, ph::_1);
params.on_audio = std::bind(&Client::onTpAudioData, this, ph::_1);
params.on_connected = std::bind(&Client::onTpConnected, this);
params.on_conn_changed = std::bind(&Client::onTpConnChanged, this);
params.on_failed = std::bind(&Client::onTpFailed, this);
params.on_disconnected = std::bind(&Client::onTpDisconnected, this);
params.onSignalingMessage = std::bind(&Client::onTpSignalingMessage, this, ph::_1, ph::_2);
params.video_codec_type = video_params_.codec_type;
tp_client_ = lt::tp::ClientTCP::create(params);
#endif // LT_USE_LTRTC
if (tp_client_ == nullptr) {
LOG(INFO) << "Create rtc client failed";
return false;
}
if (!tp_client_->connect()) {
LOG(INFO) << "LTClient connect failed";
return false;
}
return true;
params.on_signaling_message = std::bind(&Client::onTpSignalingMessage, this, ph::_1, ph::_2);
params.audio_recv_ssrc = 687154681;
params.video_recv_ssrc = 541651314;
// TODO: key and cert合理的创建时机
params.key_and_cert = rtc2::KeyAndCert::create();
params.remote_digest;
return rtc2::Client::create(params);
}
void Client::onTpData(const uint8_t* data, uint32_t size, bool is_reliable) {

View File

@@ -113,6 +113,9 @@ private:
// transport
bool initTransport();
std::unique_ptr<tp::Client> createTcpClient();
std::unique_ptr<tp::Client> createRtcClient();
std::unique_ptr<tp::Client> createRtc2Client();
void onTpData(const uint8_t* data, uint32_t size, bool is_reliable);
void onTpVideoFrame(const lt::VideoFrame& frame);
void onTpAudioData(const lt::AudioData& audio_data);

View File

@@ -50,9 +50,9 @@
#include <service/service.h>
#endif
#if LT_USE_LTRTC
#if LT_TRANSPORT_TYPE == LT_TRANSPORT_RTC
#include <rtc/rtc.h>
#endif // LT_USE_LTRTC
#endif
#include "firewall.h"
@@ -82,6 +82,10 @@ void sigint_handler(int) {
::exit(0);
}
void terminateCallback(const std::string& last_word) {
LOG(INFO) << "Last words: " << last_word;
}
void initLogAndMinidump(Role role) {
std::string prefix;
std::string rtc_prefix;
@@ -122,24 +126,29 @@ void initLogAndMinidump(Role role) {
}
}
g_log_worker = g3::LogWorker::createLogWorker();
g_log_worker->addSink(std::make_unique<ltlib::LogSink>(prefix, log_dir.string()),
&ltlib::LogSink::fileWrite);
g_log_worker->addSink(
std::make_unique<ltlib::LogSink>(prefix, log_dir.string(), 30 /*flush_every_30_logs*/),
&ltlib::LogSink::fileWrite);
g3::log_levels::disable(DEBUG);
g3::only_change_at_initialization::addLogLevel(ERR);
g3::initializeLogging(g_log_worker.get());
ltlib::ThreadWatcher::instance()->registerTerminateCallback(
[](const std::string& last_word) { LOG(INFO) << "Last words: " << last_word; });
if ((role == Role::Service || role == Role::Client) && !rtc_prefix.empty()) {
#if LT_USE_LTRTC
#if LT_TRANSPORT_TYPE == LT_TRANSPORT_RTC
rtc::initLogging(log_dir.string().c_str(), rtc_prefix.c_str());
#endif // LT_USE_LTRTC
#endif
}
LOG(INFO) << "Log system initialized";
// g3log必须再minidump前初始化
g_minidump_genertator = std::make_unique<LTMinidumpGenerator>(log_dir.string());
signal(SIGINT, sigint_handler);
// ltlib::ThreadWatcher::instance()->disable_crash_on_timeout();
if (LT_CRASH_ON_THREAD_HANGS) {
ltlib::ThreadWatcher::instance()->enableCrashOnTimeout();
ltlib::ThreadWatcher::instance()->registerTerminateCallback(terminateCallback);
}
else {
ltlib::ThreadWatcher::instance()->disableCrashOnTimeout();
}
}
std::map<std::string, std::string> parseOptions(int argc, char* argv[]) {
@@ -244,7 +253,7 @@ int main(int argc, char* argv[]) {
}
else if (iter->second == "client") {
// 方便调试attach
// std::this_thread::sleep_for(std::chrono::seconds { 15 });
// std::this_thread::sleep_for(std::chrono::seconds{15});
return runAsClient(options);
}
else if (iter->second == "worker") {

View File

@@ -55,11 +55,9 @@
#include <ltlib/system.h>
#include <ltlib/times.h>
#if LT_USE_LTRTC
#include <rtc/rtc.h>
#else // LT_USE_LTRTC
#include <transport/transport_rtc.h>
#include <transport/transport_rtc2.h>
#include <transport/transport_tcp.h>
#endif // LT_USE_LTRTC
#include "worker_process.h"
#include <string_keys.h>
@@ -207,11 +205,51 @@ bool WorkerSession::init(std::shared_ptr<google::protobuf::MessageLite> _msg) {
return true;
}
bool WorkerSession::initRtcServer() {
bool WorkerSession::initTransport() {
switch (LT_TRANSPORT_TYPE) {
case LT_TRANSPORT_TCP:
tp_server_ = createTcpServer();
break;
case LT_TRANSPORT_RTC:
tp_server_ = createRtcServer();
break;
case LT_TRANSPORT_RTC2:
tp_server_ = createRtc2Server();
break;
default:
break;
}
if (tp_server_ == nullptr) {
LOG(ERR) << "Create transport server failed";
return false;
}
else {
rtc_closed_ = false;
return true;
}
}
std::unique_ptr<tp::Server> WorkerSession::createTcpServer() {
namespace ph = std::placeholders;
auto negotiated_params =
std::static_pointer_cast<ltproto::peer2peer::StreamingParams>(negotiated_streaming_params_);
lt::tp::ServerTCP::Params params{};
params.video_codec_type = ::to_ltrtc(
static_cast<ltproto::peer2peer::VideoCodecType>(negotiated_params->video_codecs().Get(0)));
params.on_failed = std::bind(&WorkerSession::onTpFailed, this);
params.on_disconnected = std::bind(&WorkerSession::onTpDisconnected, this);
params.on_accepted = std::bind(&WorkerSession::onTpAccepted, this);
params.on_data = std::bind(&WorkerSession::onTpData, this, ph::_1, ph::_2, ph::_3);
params.on_signaling_message =
std::bind(&WorkerSession::onTpSignalingMessage, this, ph::_1, ph::_2);
return lt::tp::ServerTCP::create(params);
}
std::unique_ptr<tp::Server> WorkerSession::createRtcServer() {
namespace ph = std::placeholders;
#if LT_USE_LTRTC
auto negotiated_params =
std::static_pointer_cast<ltproto::peer2peer::StreamingParams>(negotiated_streaming_params_);
rtc::Server::Params params{};
params.use_nbp2p = true;
std::vector<const char*> reflex_servers;
@@ -253,27 +291,31 @@ bool WorkerSession::initRtcServer() {
params.on_video_bitrate_update =
std::bind(&WorkerSession::onTpEesimatedVideoBitreateUpdate, this, ph::_1);
params.on_loss_rate_update = std::bind(&WorkerSession::onTpLossRateUpdate, this, ph::_1);
tp_server_ = rtc::Server::create(std::move(params));
#else // LT_USE_LTRTC
lt::tp::ServerTCP::Params params{};
params.video_codec_type = ::to_ltrtc(negotiated_params->video_codecs().Get(0));
return rtc::Server::create(std::move(params));
}
std::unique_ptr<tp::Server> WorkerSession::createRtc2Server() {
namespace ph = std::placeholders;
rtc2::Server::Params params{};
params.on_accepted = std::bind(&WorkerSession::onTpAccepted, this);
params.on_failed = std::bind(&WorkerSession::onTpFailed, this);
params.on_disconnected = std::bind(&WorkerSession::onTpDisconnected, this);
params.on_accepted = std::bind(&WorkerSession::onTpAccepted, this);
params.on_conn_changed = std::bind(&WorkerSession::onTpConnChanged, this);
params.on_data = std::bind(&WorkerSession::onTpData, this, ph::_1, ph::_2, ph::_3);
params.onSignalingMessage =
params.on_signaling_message =
std::bind(&WorkerSession::onTpSignalingMessage, this, ph::_1, ph::_2);
tp_server_ = lt::tp::ServerTCP::create(params);
#endif // LT_USE_LTRTC
if (tp_server_ == nullptr) {
LOG(ERR) << "Create rtc server failed";
return false;
}
else {
rtc_closed_ = false;
return true;
params.on_keyframe_request = std::bind(&WorkerSession::onTpRequestKeyframe, this);
params.on_video_bitrate_update =
std::bind(&WorkerSession::onTpEesimatedVideoBitreateUpdate, this, ph::_1);
params.on_loss_rate_update = std::bind(&WorkerSession::onTpLossRateUpdate, this, ph::_1);
params.remote_digest;
params.key_and_cert = rtc2::KeyAndCert::create();
if (params.key_and_cert == nullptr) {
return nullptr;
}
params.video_send_ssrc = 541651314;
params.audio_send_ssrc = 687154681;
return rtc2::Server::create(params);
}
void WorkerSession::mainLoop(const std::function<void()>& i_am_alive) {
@@ -337,7 +379,7 @@ void WorkerSession::maybeOnCreateSessionCompleted() {
if (negotiated_streaming_params_ == nullptr) {
return;
}
if (!initRtcServer()) {
if (!initTransport()) {
on_create_session_completed_(false, session_name_, empty_params);
return;
}
@@ -456,6 +498,8 @@ void WorkerSession::onSignalingMessageAck(std::shared_ptr<google::protobuf::Mess
void WorkerSession::dispatchSignalingMessageRtc(
std::shared_ptr<google::protobuf::MessageLite> _msg) {
auto msg = std::static_pointer_cast<ltproto::signaling::SignalingMessage>(_msg);
LOG(INFO) << "Received signaling key:" << msg->rtc_message().key().c_str()
<< ", value:" << msg->rtc_message().value().c_str();
tp_server_->onSignalingMessage(msg->rtc_message().key().c_str(),
msg->rtc_message().value().c_str());
}

View File

@@ -79,14 +79,16 @@ private:
on_create_completed,
std::function<void(CloseReason, const std::string&, const std::string&)> on_closed);
bool init(std::shared_ptr<google::protobuf::MessageLite> msg);
bool initRtcServer();
bool initTransport();
std::unique_ptr<tp::Server> createTcpServer();
std::unique_ptr<tp::Server> createRtcServer();
std::unique_ptr<tp::Server> createRtc2Server();
void createWorkerProcess(uint32_t client_width, uint32_t client_height,
uint32_t client_refresh_rate,
std::vector<lt::VideoCodecType> client_codecs);
void mainLoop(const std::function<void()>& i_am_alive);
void onClosed(CloseReason reason);
void maybeOnCreateSessionCompleted();
bool createVideoEncoder();
void postTask(const std::function<void()>& task);
void postDelayTask(int64_t delay_ms, const std::function<void()>& task);

View File

@@ -213,7 +213,6 @@ bool Worker::init() {
}
}
ioloop_->postDelay(500 /*ms*/, std::bind(&Worker::checkCimeout, this));
std::promise<void> promise;
auto future = promise.get_future();
thread_ = ltlib::BlockingThread::create(
@@ -222,6 +221,7 @@ bool Worker::init() {
mainLoop(i_am_alive);
});
future.get();
ioloop_->postDelay(500 /*ms*/, std::bind(&Worker::checkCimeout, this));
return true;
} // namespace lt

View File

@@ -9,4 +9,5 @@ set(LT_SERVER_USE_SSL OFF)
set(LT_RUN_AS_SERVICE OFF)
set(LT_WIN_SERVICE_NAME Lanthing)
set(LT_WIN_SERVICE_DISPLAY_NAME "Lanthing Service")
set(LT_USE_LTRTC ON)
set(LT_TRANSPORT_TYPE ${LT_TRANSPORT_RTC})
set(LT_CRASH_ON_THREAD_HANGS ON)

View File

@@ -166,6 +166,7 @@ bool ConnectionImpl::init() {
// message channel
MessageChannel::Params msg_param{};
msg_param.network_channel = network_channel_.get();
msg_param.reliable_ssrc = 0;
msg_param.half_reliable_ssrc = 0;
msg_param.mtu = 1400;
@@ -191,15 +192,12 @@ bool ConnectionImpl::init() {
if (dtls_ == nullptr) {
return false;
}
return true;
}
void ConnectionImpl::start() {
started_ = true;
network_channel_->start();
// TODO: 改成连接成功建立后post
send_thread_->post_delay(ltlib::TimeDelta{1000} /*us*/,
std::bind(&Pacer::process, pacer_.get(), pacer_->weak_from_this()));
}
bool ConnectionImpl::sendData(const uint8_t* data, uint32_t size) {
@@ -231,7 +229,7 @@ void ConnectionImpl::onSignalingMessage(const std::string& key, const std::strin
LOG(ERR) << "Received unknown signaling message key:" << key;
return;
}
std::istringstream iss;
std::istringstream iss(value);
std::string key1, key2, type, addr;
iss >> key1;
iss >> type;
@@ -252,6 +250,10 @@ void ConnectionImpl::onSignalingMessage(const std::string& key, const std::strin
LOG(ERR) << "Invalid address " << addr;
return;
}
if (params_.is_server && !started_) {
started_ = true;
network_channel_->start();
}
network_channel_->addRemoteInfo(info);
}
@@ -323,15 +325,13 @@ void ConnectionImpl::onDtlsPacket(const uint8_t* data, uint32_t size, int64_t ti
}
void ConnectionImpl::onDtlsConnected() {
// LOG(INFO) << "";
LOG(INFO) << "Connected";
}
void ConnectionImpl::onDtlsDisconnected() {
// TODO: 整个断链
LOG(INFO) << "Disconnected";
}
// 原本让EndpointInfo回调到上层让上层按照自己的方式做序列化会更好
// 但具体到这个项目lanthing和rtc2是一体的你我之间不必拿刻度尺分太清怎么方便怎么来
void ConnectionImpl::onEndpointInfo(const EndpointInfo& info) {
std::ostringstream oss;
oss << FieldType << " " << to_str(info.type) << " " << FieldAddr << " "

View File

@@ -85,6 +85,7 @@ private:
std::vector<std::shared_ptr<AudioReceiveStream>> audio_receive_streams_;
std::shared_ptr<MessageChannel> message_channel_;
std::shared_ptr<DtlsChannel> dtls_;
std::atomic<bool> started_ = false;
};
} // namespace rtc2

View File

@@ -89,9 +89,9 @@ bool KeyAndCertImpl::createInternal() {
});
mbedtls_entropy_init(entropy.get());
mbedtls_ctr_drbg_init(drbg.get());
std::unique_ptr<uint8_t[]> seed{new uint8_t[MBEDTLS_ENTROPY_MAX_SEED_SIZE]};
int ret = mbedtls_ctr_drbg_seed(drbg.get(), mbedtls_entropy_func, entropy.get(), seed.get(),
MBEDTLS_ENTROPY_MAX_SEED_SIZE);
std::unique_ptr<uint8_t[]> seed{new uint8_t[128]};
int ret =
mbedtls_ctr_drbg_seed(drbg.get(), mbedtls_entropy_func, entropy.get(), seed.get(), 128);
if (ret != 0) {
LOG(ERR) << "mbedtls_ctr_drbg_seed failed " << ret;
return false;

View File

@@ -109,7 +109,7 @@ bool MbedDtls::init() {
if (!tls_init_context()) {
return false;
}
if (tls_init_engine()) {
if (!tls_init_engine()) {
return false;
}
return true;
@@ -238,7 +238,7 @@ bool MbedDtls::tls_init_context() {
const mbedtls_ssl_ciphersuite_t* c = mbedtls_ssl_ciphersuite_from_string(name.c_str());
if (c != nullptr) {
ciphersuites_.push_back(c->MBEDTLS_PRIVATE(id));
printf("Adding ciphersuite (%#x) %s\n", c->MBEDTLS_PRIVATE(id), name.c_str());
LOGF(DEBUG, "Adding ciphersuite (%#x) %s\n", c->MBEDTLS_PRIVATE(id), name.c_str());
}
}
if (ciphersuites_.empty()) {
@@ -251,9 +251,8 @@ bool MbedDtls::tls_init_context() {
mbedtls_ssl_conf_min_version(&ssl_cfg_, MBEDTLS_SSL_MAJOR_VERSION_3,
MBEDTLS_SSL_MINOR_VERSION_3);
mbedtls_ssl_conf_verify(&ssl_cfg_, &verify_cert, this);
std::unique_ptr<uint8_t[]> seed{new uint8_t[MBEDTLS_ENTROPY_MAX_SEED_SIZE]};
int ret = mbedtls_ctr_drbg_seed(&drbg_, mbedtls_entropy_func, &entropy_, seed.get(),
MBEDTLS_ENTROPY_MAX_SEED_SIZE);
std::unique_ptr<uint8_t[]> seed{new uint8_t[128]};
int ret = mbedtls_ctr_drbg_seed(&drbg_, mbedtls_entropy_func, &entropy_, seed.get(), 128);
if (ret != 0) {
LOG(ERR) << "mbedtls_ctr_drbg_seed failed " << ret;
return false;

View File

@@ -285,22 +285,30 @@ sockaddr_storage Address::to_storage() const {
}
Address Address::from_storage(const sockaddr_storage& storage) {
return from_storage(&storage);
}
Address Address::from_storage(const sockaddr_storage* storage) {
Address addr{};
if (storage.ss_family == AF_INET) {
const sockaddr_in* in4 = reinterpret_cast<const sockaddr_in*>(&storage);
if (storage->ss_family == AF_INET) {
const sockaddr_in* in4 = reinterpret_cast<const sockaddr_in*>(storage);
addr.family_ = AF_INET;
addr.port_ = in4->sin_port;
addr.port_ = ntohs(in4->sin_port);
addr.ip_.v4 = in4->sin_addr;
}
else if (storage.ss_family == AF_INET6) {
const sockaddr_in6* in6 = reinterpret_cast<const sockaddr_in6*>(&storage);
else if (storage->ss_family == AF_INET6) {
const sockaddr_in6* in6 = reinterpret_cast<const sockaddr_in6*>(storage);
addr.family_ = AF_INET6;
addr.port_ = in6->sin6_port;
addr.port_ = ntohs(in6->sin6_port);
addr.ip_.v6 = in6->sin6_addr;
}
return addr;
}
Address Address::from_sockaddr(const sockaddr* sockaddr) {
return from_storage(reinterpret_cast<const sockaddr_storage*>(sockaddr));
}
Address Address::from_str(const std::string& str) {
if (str.at(0) == '[') {
std::string::size_type closebracket = str.rfind(']');

View File

@@ -103,6 +103,8 @@ public:
sockaddr_storage& to_storage(sockaddr_storage& storage) const;
sockaddr_storage to_storage() const;
static Address from_storage(const sockaddr_storage& storage);
static Address from_storage(const sockaddr_storage* storage);
static Address from_sockaddr(const sockaddr* sockaddr);
static Address from_str(const std::string& str);
private:

View File

@@ -39,9 +39,35 @@
namespace rtc2 {
// static void printNetworkAdapters() {
// uv_interface_address_t* info;
// int count;
// char buf[512];
// uv_interface_addresses(&info, &count);
// for (int i = 0; i < count; i++) {
// uv_interface_address_t interface_a = info[i];
// if (interface_a.address.address4.sin_family == AF_INET) {
//
// uv_ip4_name(&interface_a.address.address4, buf, sizeof(buf));
//
// printf("IPv4 address: %s\n", buf);
// }
//
// else if (interface_a.address.address4.sin_family == AF_INET6) {
//
// uv_ip6_name(&interface_a.address.address6, buf, sizeof(buf));
//
// printf("IPv6 address: %s\n", buf);
// }
// LOG(INFO) << interface_a.name << " " << interface_a.is_internal << " " << buf << " "
// << std::string(&interface_a.phys_addr[0], 6).c_str();
// }
// }
NetworkChannel::NetworkChannel(const Params& p)
: on_error_{p.on_error}
, on_endpoint_info_gathered_{p.on_endpoint_info_gathered} {
// printNetworkAdapters();
P2P::Params params{};
params.is_server = p.is_server;
params.network_channel = this;
@@ -96,7 +122,12 @@ void NetworkChannel::setOnConnChanged(
on_conn_changed_ = on_conn_changed;
}
// 跑在用户线程 | 跑在网络线程
void NetworkChannel::addRemoteInfo(const EndpointInfo& info) {
if (ioloop_->isNotCurrentThread()) {
post(std::bind(&NetworkChannel::addRemoteInfo, this, info));
return;
}
p2p_->add_remote_info(info);
}

View File

@@ -69,7 +69,7 @@ std::shared_ptr<UDPSocketImpl> UDPSocketImpl::create(ltlib::IOLoop* ioloop,
return nullptr;
}
auto storage = bind_addr.to_storage();
ret = uv_udp_bind(udp, reinterpret_cast<const sockaddr*>(&storage), UV_UDP_REUSEADDR);
ret = uv_udp_bind(udp, reinterpret_cast<const sockaddr*>(&storage), 0);
if (ret != 0) {
uv_close((uv_handle_t*)udp, [](uv_handle_t* handle) {
auto udp = (uv_udp_t*)handle;
@@ -78,6 +78,18 @@ std::shared_ptr<UDPSocketImpl> UDPSocketImpl::create(ltlib::IOLoop* ioloop,
LOG(ERR) << "uv_udp_bind failed with " << ret;
return nullptr;
}
sockaddr_storage local_storage{};
int socksize = sizeof(local_storage);
ret = uv_udp_getsockname(udp, reinterpret_cast<sockaddr*>(&local_storage), &socksize);
if (ret != 0) {
uv_close((uv_handle_t*)udp, [](uv_handle_t* handle) {
auto udp = (uv_udp_t*)handle;
delete udp;
});
LOG(ERR) << "uv_udp_getsockname failed with " << ret;
return nullptr;
}
Address local_addr = Address::from_storage(local_storage);
ret = uv_udp_recv_start(udp, UDPSocketImpl::on_alloc_memory, UDPSocketImpl::on_udp_recv);
if (ret != 0) {
uv_close((uv_handle_t*)udp, [](uv_handle_t* handle) {
@@ -90,7 +102,7 @@ std::shared_ptr<UDPSocketImpl> UDPSocketImpl::create(ltlib::IOLoop* ioloop,
auto udp_socket = std::make_unique<UDPSocketImpl>();
udp->data = udp_socket.get();
udp_socket->udp_ = udp;
udp_socket->bind_addr_ = bind_addr;
udp_socket->bind_addr_ = local_addr;
return udp_socket;
}
@@ -147,15 +159,14 @@ void UDPSocketImpl::on_udp_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t*
// 在我的使用场景里这个flags似乎不用处理
(void)flags;
auto that = reinterpret_cast<UDPSocketImpl*>(handle->data);
if (nread < 0) {
if (nread <= 0) {
that->error_ = static_cast<int32_t>(nread);
delete buf->base;
// TODO: 通知错误
return;
}
if (that->on_read_ != nullptr) {
// 整个过程没有复制IPv4情况下应该不会爆炸吧
auto address = Address::from_storage(*reinterpret_cast<const sockaddr_storage*>(addr));
auto address = Address::from_sockaddr(addr);
that->on_read_(reinterpret_cast<const uint8_t*>(buf->base), static_cast<uint32_t>(nread),
address, ltlib::steady_now_us());
}

View File

@@ -65,14 +65,6 @@ void Endpoint::post_delayed_task(uint32_t delayed_ms, const std::function<void()
});
}
void Endpoint::add_remote_info(const EndpointInfo& info) {
if (remote_.type != EndpointType::Unknown) {
return;
}
remote_ = info;
send_binding_request(remote_.address);
}
Endpoint::Endpoint(std::unique_ptr<UDPSocket>&& socket, NetworkChannel* network_channel,
std::function<void(Endpoint*)> on_connected,
std::function<void(Endpoint*, const uint8_t*, uint32_t, int64_t)> on_read)
@@ -94,22 +86,43 @@ void Endpoint::send_binding_request(const Address& addr) {
int ret = sock()->sendmsg({{msg.data(), msg.size()}}, addr);
if (ret < 0) {
int error = socket_->error();
LOG(ERR) << "Send binding request to " << remote_.address.to_string()
<< " failed with error " << error;
LOG(ERR) << "Send binding request to " << addr.to_string() << " failed with error "
<< error;
}
post_delayed_task(100, std::bind(&Endpoint::send_binding_request, this, addr));
}
void Endpoint::send_binding_response(const Address& addr, const std::vector<uint8_t>& id) {
StunMessage msg{StunMessage::Type::BindingResponse,
reinterpret_cast<const uint8_t*>(id.data())};
int ret = sock()->sendmsg({{msg.data(), msg.size()}}, addr);
if (ret < 0) {
int error = socket_->error();
LOG(ERR) << "Send binding response to " << addr.to_string() << " failed with error "
<< error;
}
}
void Endpoint::add_remote_info(const EndpointInfo& info) {
if (remote_.type != EndpointType::Unknown) {
return;
}
remote_ = info;
send_binding_request(remote_.address);
}
bool Endpoint::connected() const {
return received_request_ && received_response_;
}
void Endpoint::set_received_request() {
received_request_ = true;
maybe_connected();
}
void Endpoint::set_received_response() {
received_response_ = true;
maybe_connected();
}
void Endpoint::set_local_info(const EndpointInfo& info) {
@@ -117,6 +130,7 @@ void Endpoint::set_local_info(const EndpointInfo& info) {
}
void Endpoint::maybe_connected() {
LOG(DEBUG) << "maybe_connected";
if (connected()) {
on_connected_(this);
}
@@ -126,6 +140,7 @@ void Endpoint::on_read(std::weak_ptr<Endpoint> weak_this, const uint8_t* data, u
const Address& remote_addr, const int64_t& packet_time_us) {
auto shared_this = weak_this.lock();
if (shared_this == nullptr) {
LOG(WARNING) << "shared_this<Endpoint> == nullptr";
return;
}
StunMessage msg{reinterpret_cast<const uint8_t*>(data),
@@ -145,8 +160,12 @@ void Endpoint::on_read(std::weak_ptr<Endpoint> weak_this, const uint8_t* data, u
}
else {
if (connected()) {
LOG(INFO) << "ON_READ_ CONNECTED";
on_read_(this, data, size, packet_time_us);
}
else {
LOG(INFO) << "Onread but not connected";
}
}
}

View File

@@ -69,6 +69,7 @@ protected:
std::function<void(Endpoint*, const uint8_t*, uint32_t, int64_t)> on_read);
UDPSocket* sock();
void send_binding_request(const Address& addr);
void send_binding_response(const Address& addr, const std::vector<uint8_t>& id);
bool connected() const;
void set_received_request();
void set_received_response();

View File

@@ -30,19 +30,22 @@
#include "lan_endpoint.h"
#include <ltlib/logging.h>
namespace rtc2 {
std::unique_ptr<LanEndpoint> LanEndpoint::create(const Params& params) {
std::shared_ptr<LanEndpoint> LanEndpoint::create(const Params& params) {
auto udp_socket = params.network_channel->createUDPSocket(params.addr);
if (udp_socket == nullptr) {
return nullptr;
}
std::unique_ptr<LanEndpoint> ep{new LanEndpoint(std::move(udp_socket), params.network_channel,
uint16_t port = udp_socket->port();
std::shared_ptr<LanEndpoint> ep{new LanEndpoint(std::move(udp_socket), params.network_channel,
params.on_connected, params.on_read)};
ep->init();
EndpointInfo info{};
info.address = params.addr;
info.address.set_port(port);
info.type = EndpointType::Lan;
ep->set_local_info(info);
params.on_endpoint_info(info);
@@ -66,23 +69,23 @@ void LanEndpoint::on_binding_request(const StunMessage& msg, const Address& remo
const int64_t& packet_time_us) {
(void)msg;
(void)packet_time_us;
// 此处只验证对方IP:Port不浪费更多资源因为上面还有一层DTLS验证
// 后面如果确实想要在此处做进一步验证可以用传入的p2p_username和p2p_password做HMAC校验
LOG(INFO) << "on_binding_request";
if (remote_addr != remote_info().address) {
return;
}
set_received_request();
send_binding_response(remote_addr, msg.id());
}
void LanEndpoint::on_binding_response(const StunMessage& msg, const Address& remote_addr,
const int64_t& packet_time_us) {
(void)msg;
(void)packet_time_us;
// 此处只验证对方IP:Port不浪费更多资源因为上面还有一层DTLS验证
// 后面如果确实想要在此处做进一步验证可以用传入的p2p_username和p2p_password做HMAC校验
LOG(INFO) << "on_binding_response";
if (remote_addr != remote_info().address) {
return;
}
set_received_response();
}
} // namespace rtc2

View File

@@ -44,7 +44,7 @@ public:
};
public:
static std::unique_ptr<LanEndpoint> create(const Params& params);
static std::shared_ptr<LanEndpoint> create(const Params& params);
int32_t send(std::vector<std::span<const uint8_t>> spans) override;
EndpointType type() const override;

View File

@@ -37,11 +37,12 @@
#else
#endif
#include <ltlib/logging.h>
namespace rtc2 {
#if defined(LT_WINDOWS)
// 后面试试libuv不搞那么多#if
Address getNetcardAddress() {
Address result{};
ULONG flags = (GAA_FLAG_SKIP_DNS_SERVER | GAA_FLAG_SKIP_ANYCAST | GAA_FLAG_SKIP_MULTICAST |
@@ -69,10 +70,8 @@ Address getNetcardAddress() {
}
PIP_ADAPTER_UNICAST_ADDRESS address = adapters->FirstUnicastAddress;
if (address != nullptr) {
sockaddr_in* addr = reinterpret_cast<sockaddr_in*>(address->Address.lpSockaddr);
sockaddr_storage storage{};
memcpy(&storage, addr, sizeof(sockaddr_in));
result = Address::from_storage(storage);
sockaddr* addr = reinterpret_cast<sockaddr*>(address->Address.lpSockaddr);
result = Address::from_sockaddr(addr);
return result;
}
adapters = adapters->Next;

View File

@@ -120,6 +120,7 @@ void P2P::post_delayed_task(uint32_t delayed_ms, const std::function<void()>& ta
}
void P2P::create_lan_endpoint() {
LOG(INFO) << "create_lan_endpoint";
Address netcard_addr = getNetcardAddress();
if (netcard_addr.family() == -1) {
LOG(WARNING) << "getNetcardAddress failed, no NIC";

View File

@@ -81,6 +81,10 @@ const uint8_t* StunMessage::data() const {
return msg_->data();
}
std::vector<uint8_t> StunMessage::id() const {
return msg_->id();
}
uint8_t* StunMessage::data() {
return msg_->data();
}

View File

@@ -33,6 +33,7 @@
#include <memory>
#include <optional>
#include <vector>
#include <modules/network/address.h>
@@ -60,6 +61,7 @@ public:
bool verify() const;
Type type() const;
const uint8_t* data() const;
std::vector<uint8_t> id() const;
uint8_t* data();
size_t size() const;
std::optional<Address> mapped_address() const;

View File

@@ -641,6 +641,12 @@ public:
uint8_t* data() { return buffer_.data(); }
const uint8_t* data() const { return buffer_.data(); }
std::vector<uint8_t> id() const {
std::vector<uint8_t> tid(12);
memcpy(tid.data(), hdr()->tsx_id, 12);
return tid;
}
size_t size() const { return stun_msg_len(hdr()); }
bool verify() const { return stun_msg_verify(hdr(), capacity()) == 0 ? false : true; }

View File

@@ -76,6 +76,10 @@ std::unique_ptr<Client> Client::create(const Params& params) {
conn_params.is_server = false;
conn_params.key_and_cert = params.key_and_cert;
conn_params.remote_digest = params.remote_digest;
conn_params.on_signaling_message =
[cb = params.on_signaling_message](const std::string& key, const std::string& value) {
cb(key.c_str(), value.c_str());
};
//
auto conn = Connection::create(conn_params);
if (conn == nullptr) {
@@ -132,9 +136,13 @@ std::unique_ptr<Server> Server::create(const Params& params) {
data_param.on_data = params.on_data;
conn_params.data = data_param;
// others
conn_params.is_server = false;
conn_params.is_server = true;
conn_params.key_and_cert = params.key_and_cert;
conn_params.remote_digest = params.remote_digest;
conn_params.on_signaling_message =
[cb = params.on_signaling_message](const std::string& key, const std::string& value) {
cb(key.c_str(), value.c_str());
};
//
auto conn = Connection::create(conn_params);
if (conn == nullptr) {