Thread management

pull/12/head
Ken 2020-12-09 20:52:08 -05:00
parent 254205ce21
commit 5b3e4d10b7
15 changed files with 438 additions and 143 deletions

View File

@ -10,6 +10,8 @@ executable("tincan") {
"trunk/src/tap_frame.cc", "trunk/src/tap_frame.cc",
"trunk/src/single_link_tunnel.cc", "trunk/src/single_link_tunnel.cc",
"trunk/src/basic_tunnel.cc", "trunk/src/basic_tunnel.cc",
"trunk/src/tunnel_threads.cc",
"trunk/src/async_io.cc",
] ]
include_dirs = [ include_dirs = [
"trunk/include", "trunk/include",
@ -50,3 +52,4 @@ executable("tincan") {
} }
} }

View File

@ -56,6 +56,7 @@ struct AsyncIo
ZeroMemory(this, sizeof(OVERLAPPED)); ZeroMemory(this, sizeof(OVERLAPPED));
#endif // defined(_TNC_WIN) #endif // defined(_TNC_WIN)
} }
AsyncIo( AsyncIo(
uint8_t* buffer, uint8_t* buffer,
uint32_t bytes_to_transfer, uint32_t bytes_to_transfer,
@ -73,79 +74,35 @@ struct AsyncIo
ZeroMemory(this, sizeof(OVERLAPPED)); ZeroMemory(this, sizeof(OVERLAPPED));
#endif // defined(_TNC_WIN) #endif // defined(_TNC_WIN)
} }
AsyncIo(AsyncIo & rhs);
#if !defined(_TNC_WIN) #if !defined(_TNC_WIN)
virtual ~AsyncIo() = default; virtual ~AsyncIo() = default;
#endif // defined(_TNC_WIN) #endif // defined(_TNC_WIN)
void Initialize( void Initialize(
uint8_t* buffer_to_transfer, uint8_t* buffer_to_transfer,
uint32_t bytes_to_transfer, uint32_t bytes_to_transfer,
void* context = nullptr, void* context,
AIO_OP flags = AIO_READ, AIO_OP flags,
uint32_t bytes_transferred = 0) uint32_t bytes_transferred);
{ AsyncIo &operator= (const AsyncIo & rhs);
buffer_to_transfer_ = buffer_to_transfer; bool operator==(const AsyncIo & rhs) const;
bytes_to_transfer_ = bytes_to_transfer; bool operator!=(const AsyncIo & rhs) const;
context_ = context; void BufferToTransfer(uint8_t* val);
flags_ = flags; uint8_t* BufferToTransfer();
bytes_transferred_ = bytes_transferred; void BytesToTransfer(uint32_t val);
} uint32_t BytesToTransfer();
void BytesTransferred(uint32_t val);
void BufferToTransfer(uint8_t* val) uint32_t BytesTransferred();
{ void Context(void * val);
buffer_to_transfer_ = val; void * Context();
} bool IsRead();
uint8_t* BufferToTransfer() void SetReadOp();
{ bool IsWrite();
return buffer_to_transfer_; void SetWriteOp();
} bool IsGood();
void BytesToTransfer(uint32_t val)
{
bytes_to_transfer_ = val;
}
uint32_t BytesToTransfer()
{
return bytes_to_transfer_;
}
void BytesTransferred(uint32_t val)
{
bytes_transferred_ = val;
}
uint32_t BytesTransferred()
{
return bytes_transferred_;
}
void Context(void * val)
{
context_ = val;
}
void * Context()
{
return context_;
}
bool IsRead()
{
return flags_ == AIO_READ;
}
void SetReadOp()
{
flags_ = AIO_READ;
}
bool IsWrite()
{
return flags_ == AIO_WRITE;
}
void SetWriteOp()
{
flags_ = AIO_WRITE;
}
bool IsGood()
{
return good_;
}
uint8_t * buffer_to_transfer_; uint8_t * buffer_to_transfer_;
void * context_; void * context_;
uint32_t bytes_to_transfer_; uint32_t bytes_to_transfer_;

View File

@ -30,7 +30,6 @@
#undef max #undef max
#endif // #endif //
#include "rtc_base/ssl_identity.h" #include "rtc_base/ssl_identity.h"
#include "rtc_base/thread.h"
#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/strings/json.h" #include "rtc_base/strings/json.h"
#include "async_io.h" #include "async_io.h"
@ -40,7 +39,7 @@
#include "tincan_exception.h" #include "tincan_exception.h"
#include "tunnel_descriptor.h" #include "tunnel_descriptor.h"
#include "virtual_link.h" #include "virtual_link.h"
#include "tunnel_threads.h"
namespace tincan namespace tincan
{ {
class BasicTunnel : class BasicTunnel :
@ -56,6 +55,10 @@ public:
MSGID_FWD_FRAME, MSGID_FWD_FRAME,
MSGID_FWD_FRAME_RD, MSGID_FWD_FRAME_RD,
MSGID_DISC_LINK, MSGID_DISC_LINK,
MSGID_TAP_READ,
MSGID_TAP_WRITE,
MSGID_TAP_UP,
MSGID_TAP_DOWN
}; };
class TransmitMsgData : public MessageData class TransmitMsgData : public MessageData
{ {
@ -82,9 +85,19 @@ public:
~LinkMsgData() = default; ~LinkMsgData() = default;
}; };
class TapMessageData : public MessageData
{
public:
unique_ptr<AsyncIo> aio_;
TapMessageData(unique_ptr<AsyncIo> aio) : aio_(move(aio))
{}
~TapMessageData() = default;
};
BasicTunnel( BasicTunnel(
unique_ptr<TunnelDescriptor> descriptor, unique_ptr<TunnelDescriptor> descriptor,
ControllerLink * ctrl_handle); ControllerLink * ctrl_handle,
TunnelThreads *thread_pool);
virtual ~BasicTunnel(); virtual ~BasicTunnel();
@ -167,15 +180,16 @@ protected:
string vlink_id); string vlink_id);
virtual void VLinkDown( virtual void VLinkDown(
string vlink_id); string vlink_id);
rtc::Thread* SignalThread();
rtc::Thread* NetworkThread();
rtc::Thread* TapThread();
unique_ptr<TapDev> tdev_; unique_ptr<TapDev> tdev_;
unique_ptr<TapDescriptor> tap_desc_; unique_ptr<TapDescriptor> tap_desc_;
unique_ptr<TunnelDescriptor> descriptor_; unique_ptr<TunnelDescriptor> descriptor_;
//shared_ptr<ControllerLink> ctrl_link_;
ControllerLink * ctrl_link_; ControllerLink * ctrl_link_;
unique_ptr<rtc::SSLIdentity> sslid_; unique_ptr<rtc::SSLIdentity> sslid_;
unique_ptr<rtc::SSLFingerprint> local_fingerprint_; unique_ptr<rtc::SSLFingerprint> local_fingerprint_;
rtc::Thread* net_worker_; TunnelThreads *thread_pool_;
rtc::Thread* sig_worker_;
rtc::BasicNetworkManager net_manager_; rtc::BasicNetworkManager net_manager_;
}; };
} // namespace tincan } // namespace tincan

View File

@ -65,7 +65,7 @@ public:
const TapDescriptor & tap_desc) override; const TapDescriptor & tap_desc) override;
void Close() override; void Close() override;
uint32_t Read(AsyncIo& aio_rd) override; uint32_t Read(AsyncIo& aio_rd) override;
uint32_t Write(AsyncIo& aio_wr) override; uint32_t Write(unique_ptr<AsyncIo> aio_wr) override;
uint16_t Mtu() override; uint16_t Mtu() override;
void Up() override; void Up() override;
void Down() override; void Down() override;
@ -84,6 +84,7 @@ private:
bool is_good_; bool is_good_;
void SetFlags(short a, short b); void SetFlags(short a, short b);
void PlenToIpv4Mask(unsigned int a, struct sockaddr *b); void PlenToIpv4Mask(unsigned int a, struct sockaddr *b);
int FileDesc();
}; };
} }
} }

View File

@ -33,7 +33,8 @@ class SingleLinkTunnel :
public: public:
SingleLinkTunnel( SingleLinkTunnel(
unique_ptr<TunnelDescriptor> descriptor, unique_ptr<TunnelDescriptor> descriptor,
ControllerLink * ctrl_handle); ControllerLink * ctrl_handle,
TunnelThreads *thread_pool);
virtual ~SingleLinkTunnel() = default; virtual ~SingleLinkTunnel() = default;
shared_ptr<VirtualLink> CreateVlink( shared_ptr<VirtualLink> CreateVlink(

View File

@ -72,7 +72,7 @@ public:
AsyncIo & aio_rd) = 0; AsyncIo & aio_rd) = 0;
virtual uint32_t Write( virtual uint32_t Write(
AsyncIo & aio_wr) = 0; unique_ptr<AsyncIo> aio_wr) = 0;
virtual MacAddressType MacAddress() = 0; virtual MacAddressType MacAddress() = 0;

View File

@ -28,6 +28,7 @@
#include "control_listener.h" #include "control_listener.h"
#include "control_dispatch.h" #include "control_dispatch.h"
#include "single_link_tunnel.h" #include "single_link_tunnel.h"
#include "tunnel_threads.h"
namespace tincan { namespace tincan {
class Tincan : class Tincan :
@ -108,7 +109,7 @@ private:
std::mutex tunnels_mutex_; std::mutex tunnels_mutex_;
std::mutex inprogess_controls_mutex_; std::mutex inprogess_controls_mutex_;
rtc::Event exit_event_; rtc::Event exit_event_;
TunnelThreads thread_pool_;
}; };
} //namespace tincan } //namespace tincan
#endif //TINCAN_TINCAN_H_ #endif //TINCAN_TINCAN_H_

View File

@ -0,0 +1,45 @@
/*
* EdgeVPNio
* Copyright 2020, University of Florida
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef TINCAN_TUNNEL_THREADS_H_
#define TINCAN_TUNNEL_THREADS_H_
#include "tincan_base.h"
#include "rtc_base/thread.h"
namespace tincan
{
class TunnelThreads {
public:
TunnelThreads();
TunnelThreads(TunnelThreads& rhs) = delete;
TunnelThreads& operator=(const TunnelThreads&) = delete;
~TunnelThreads();
// <signal, network>
std::pair<rtc::Thread*, rtc::Thread*>LinkThreads();
rtc::Thread* TapThread();
private:
rtc::Thread signal_thread_;
rtc::Thread network_thread_;
rtc::Thread tap_thread_;
static unsigned int num_;
};
} // namespace tincan
#endif // TINCAN_TUNNEL_THREADS_H_

View File

@ -0,0 +1,160 @@
/*
* EdgeVPNio
* Copyright 2020, University of Florida
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "async_io.h"
#include "tincan_exception.h"
namespace tincan
{
AsyncIo::AsyncIo(AsyncIo & rhs)
{
*this = rhs;
}
AsyncIo &
AsyncIo::operator= (const AsyncIo & rhs)
{
if (&rhs != this){
this->buffer_to_transfer_ = rhs.buffer_to_transfer_;
this->context_ = rhs.context_;
this->bytes_to_transfer_ = rhs.bytes_to_transfer_;
this->bytes_transferred_ = rhs.bytes_transferred_;
this->flags_ = rhs.flags_;
this->good_ = rhs.good_;
}
return *this;
}
bool
AsyncIo::operator==(
const AsyncIo & rhs) const
{
return (
this == &rhs ||
this->buffer_to_transfer_ == rhs.buffer_to_transfer_ ||
this->context_ == rhs.context_ ||
this->bytes_to_transfer_ == rhs.bytes_to_transfer_ ||
this->bytes_transferred_ == rhs.bytes_transferred_ ||
this->flags_ == rhs.flags_ ||
this->good_ == rhs.good_);
}
bool
AsyncIo::operator!=(
const AsyncIo & rhs) const
{
return !(*this == rhs);
}
void
AsyncIo::Initialize(
uint8_t* buffer_to_transfer,
uint32_t bytes_to_transfer,
void* context = nullptr,
AIO_OP flags = AIO_READ,
uint32_t bytes_transferred = 0)
{
buffer_to_transfer_ = buffer_to_transfer;
bytes_to_transfer_ = bytes_to_transfer;
context_ = context;
flags_ = flags;
bytes_transferred_ = bytes_transferred;
}
void
AsyncIo::BufferToTransfer(uint8_t* val)
{
buffer_to_transfer_ = val;
}
uint8_t*
AsyncIo::BufferToTransfer()
{
return buffer_to_transfer_;
}
void
AsyncIo::BytesToTransfer(uint32_t val)
{
bytes_to_transfer_ = val;
}
uint32_t
AsyncIo::BytesToTransfer()
{
return bytes_to_transfer_;
}
void
AsyncIo::BytesTransferred(uint32_t val)
{
bytes_transferred_ = val;
}
uint32_t
AsyncIo::BytesTransferred()
{
return bytes_transferred_;
}
void
AsyncIo::Context(void * val)
{
context_ = val;
}
void *
AsyncIo::Context()
{
return context_;
}
bool
AsyncIo::IsRead()
{
return flags_ == AIO_READ;
}
void
AsyncIo::SetReadOp()
{
flags_ = AIO_READ;
}
bool
AsyncIo::IsWrite()
{
return flags_ == AIO_WRITE;
}
void
AsyncIo::SetWriteOp()
{
flags_ = AIO_WRITE;
}
bool
AsyncIo::IsGood()
{
return good_;
}
} //tincan

View File

@ -28,21 +28,18 @@ namespace tincan
extern TincanParameters tp; extern TincanParameters tp;
BasicTunnel::BasicTunnel( BasicTunnel::BasicTunnel(
unique_ptr<TunnelDescriptor> descriptor, unique_ptr<TunnelDescriptor> descriptor,
ControllerLink * ctrl_handle) : ControllerLink * ctrl_handle,
TunnelThreads *thread_pool) :
tdev_(nullptr), tdev_(nullptr),
descriptor_(move(descriptor)), descriptor_(move(descriptor)),
ctrl_link_(ctrl_handle) ctrl_link_(ctrl_handle),
thread_pool_(thread_pool)
{ {
tdev_ = make_unique<TapDev>(); tdev_ = make_unique<TapDev>();
net_worker_ = new Thread(SocketServer::CreateDefault());
sig_worker_ = new Thread(SocketServer::CreateDefault());
} }
BasicTunnel::~BasicTunnel() BasicTunnel::~BasicTunnel()
{ {}
delete net_worker_;
delete sig_worker_;
}
void void
BasicTunnel::Configure( BasicTunnel::Configure(
@ -51,7 +48,14 @@ BasicTunnel::Configure(
{ {
tap_desc_ = move(tap_desc); tap_desc_ = move(tap_desc);
//initialize the Tap Device //initialize the Tap Device
tdev_->Open(*tap_desc_.get()); if (!TapThread()->IsCurrent()) {
TapThread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(TapThread());
tdev_->Open(*tap_desc_.get());
});
} else {
tdev_->Open(*tap_desc_.get());
}
//create X509 identity for secure connections //create X509 identity for secure connections
string sslid_name = descriptor_->node_id + descriptor_->uid; string sslid_name = descriptor_->node_id + descriptor_->uid;
sslid_ = rtc::SSLIdentity::Create(sslid_name, rtc::KT_RSA); sslid_ = rtc::SSLIdentity::Create(sslid_name, rtc::KT_RSA);
@ -66,8 +70,6 @@ BasicTunnel::Configure(
void void
BasicTunnel::Start() BasicTunnel::Start()
{ {
net_worker_->Start();
sig_worker_->Start();
tdev_->read_completion_.connect(this, &BasicTunnel::TapReadComplete); tdev_->read_completion_.connect(this, &BasicTunnel::TapReadComplete);
tdev_->write_completion_.connect(this, &BasicTunnel::TapWriteComplete); tdev_->write_completion_.connect(this, &BasicTunnel::TapWriteComplete);
} }
@ -75,12 +77,29 @@ BasicTunnel::Start()
void void
BasicTunnel::Shutdown() BasicTunnel::Shutdown()
{ {
net_worker_->Quit(); if (!TapThread()->IsCurrent()) {
sig_worker_->Quit(); TapThread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(TapThread());
Shutdown();
});
return;
}
tdev_->Down(); tdev_->Down();
tdev_->Close(); tdev_->Close();
} }
rtc::Thread* BasicTunnel::SignalThread(){
return thread_pool_->LinkThreads().first;
}
rtc::Thread* BasicTunnel::NetworkThread(){
return thread_pool_->LinkThreads().second;
}
rtc::Thread* BasicTunnel::TapThread(){
return thread_pool_->TapThread();
}
unique_ptr<VirtualLink> unique_ptr<VirtualLink>
BasicTunnel::CreateVlink( BasicTunnel::CreateVlink(
unique_ptr<VlinkDescriptor> vlink_desc, unique_ptr<VlinkDescriptor> vlink_desc,
@ -90,7 +109,7 @@ BasicTunnel::CreateVlink(
vlink_desc->stun_servers = descriptor_->stun_servers; vlink_desc->stun_servers = descriptor_->stun_servers;
vlink_desc->turn_descs = descriptor_->turn_descs; vlink_desc->turn_descs = descriptor_->turn_descs;
unique_ptr<VirtualLink> vl = make_unique<VirtualLink>( unique_ptr<VirtualLink> vl = make_unique<VirtualLink>(
move(vlink_desc), move(peer_desc), sig_worker_, net_worker_); move(vlink_desc), move(peer_desc), SignalThread(), NetworkThread());
unique_ptr<SSLIdentity> sslid_copy(sslid_->Clone()); unique_ptr<SSLIdentity> sslid_copy(sslid_->Clone());
vl->Initialize(net_manager_, move(sslid_copy), vl->Initialize(net_manager_, move(sslid_copy),
make_unique<rtc::SSLFingerprint>(*local_fingerprint_.get()), make_unique<rtc::SSLFingerprint>(*local_fingerprint_.get()),
@ -123,7 +142,6 @@ void
BasicTunnel::VLinkDown( BasicTunnel::VLinkDown(
string vlink_id) string vlink_id)
{ {
//StopIo();
unique_ptr<TincanControl> ctrl = make_unique<TincanControl>(); unique_ptr<TincanControl> ctrl = make_unique<TincanControl>();
ctrl->SetControlType(TincanControl::CTTincanRequest); ctrl->SetControlType(TincanControl::CTTincanRequest);
Json::Value & req = ctrl->GetRequest(); Json::Value & req = ctrl->GetRequest();
@ -237,6 +255,14 @@ void BasicTunnel::OnMessage(Message * msg)
((LinkInfoMsgData*)msg->pdata)->msg_event.Set(); ((LinkInfoMsgData*)msg->pdata)->msg_event.Set();
} }
break; break;
case MSGID_TAP_READ:
{}
break;
case MSGID_TAP_WRITE:
{
tdev_->Write(move(((TapMessageData*)msg->pdata)->aio_));
}
break;
} }
} }
@ -261,7 +287,9 @@ BasicTunnel::InjectFame(
tf->BufferToTransfer(tf->Payload()); tf->BufferToTransfer(tf->Payload());
tf->BytesTransferred((uint32_t)len); tf->BytesTransferred((uint32_t)len);
tf->BytesToTransfer((uint32_t)len); tf->BytesToTransfer((uint32_t)len);
tdev_->Write(*tf.release()); TapMessageData *tp_ = new TapMessageData(move(tf));
TapThread()->Post(RTC_FROM_HERE, this, MSGID_TAP_WRITE, tp_);
//RTC_LOG(LS_INFO) << "Frame injected=\n" << data; //RTC_LOG(LS_INFO) << "Frame injected=\n" << data;
} }
} //namespace tincan } //namespace tincan

View File

@ -160,13 +160,15 @@ uint32_t TapDevLnx::Read(AsyncIo& aio_rd)
return 0; return 0;
} }
uint32_t TapDevLnx::Write(AsyncIo& aio_wr) uint32_t
TapDevLnx::Write(unique_ptr<AsyncIo> aio_wr)
{ {
if(!is_good_ || writer_->IsQuitting()) if(!is_good_)
return 1; //indicates a failure to setup async operation return 1; //indicates a failure to setup async operation
TapMessageData *tp_ = new TapMessageData; int nwrite = write(fd_, aio_wr->BufferToTransfer(), aio_wr->BytesToTransfer());
tp_->aio_ = &aio_wr; aio_wr->good_ = nwrite >= 0;
writer_->Post(RTC_FROM_HERE, this, MSGID_WRITE, tp_); aio_wr->BytesTransferred(nwrite);
write_completion_(aio_wr.release());
return 0; return 0;
} }
@ -186,11 +188,11 @@ void TapDevLnx::Up()
return; return;
is_good_ = true; is_good_ = true;
SetFlags(IFF_UP, 0); SetFlags(IFF_UP, 0);
if (writer_) // if (writer_)
{ // {
writer_->Quit(); // writer_->Quit();
writer_.reset(); // writer_.reset();
} // }
if (reader_) if (reader_)
{ {
reader_->Quit(); reader_->Quit();
@ -198,19 +200,19 @@ void TapDevLnx::Up()
} }
reader_ = make_unique<rtc::Thread>(SocketServer::CreateDefault()); reader_ = make_unique<rtc::Thread>(SocketServer::CreateDefault());
reader_->Start(); reader_->Start();
writer_ = make_unique<rtc::Thread>(SocketServer::CreateDefault()); // writer_ = make_unique<rtc::Thread>(SocketServer::CreateDefault());
writer_->Start(); // writer_->Start();
} }
void TapDevLnx::Down() void TapDevLnx::Down()
{ {
is_good_ = false; is_good_ = false;
if(writer_) // if(writer_)
writer_->Quit(); // writer_->Quit();
if(reader_) if(reader_)
reader_->Quit(); reader_->Quit();
reader_.reset(); reader_.reset();
writer_.reset(); //writer_.reset();
SetFlags(0, IFF_UP); SetFlags(0, IFF_UP);
RTC_LOG(LS_INFO) << "TAP device state set to DOWN"; RTC_LOG(LS_INFO) << "TAP device state set to DOWN";
@ -237,23 +239,23 @@ void TapDevLnx::OnMessage(Message * msg)
} }
} }
break; break;
case MSGID_WRITE: // case MSGID_WRITE:
{ // {
AsyncIo* aio_write = ((TapMessageData*)msg->pdata)->aio_; // AsyncIo* aio_write = ((TapMessageData*)msg->pdata)->aio_;
int nwrite = write(fd_, aio_write->BufferToTransfer(), aio_write->BytesToTransfer()); // int nwrite = write(fd_, aio_write->BufferToTransfer(), aio_write->BytesToTransfer());
if(nwrite < 0) // if(nwrite < 0)
{ // {
RTC_LOG(LS_WARNING) << "A TAP Write operation failed."; // RTC_LOG(LS_WARNING) << "A TAP Write operation failed.";
aio_write->good_ = false; // aio_write->good_ = false;
} // }
else // else
{ // {
aio_write->good_ = true; // aio_write->good_ = true;
} // }
aio_write->BytesTransferred(nwrite); // aio_write->BytesTransferred(nwrite);
write_completion_(aio_write); // write_completion_(aio_write);
} // }
break; // break;
} }
delete (TapMessageData*)msg->pdata; delete (TapMessageData*)msg->pdata;
} }
@ -263,6 +265,13 @@ TapDevLnx::Ip4()
{ {
return ip4_; return ip4_;
} }
int
TapDevLnx::FileDesc()
{
return fd_;
}
} // linux } // linux
} // tincan } // tincan
#endif // _TNC_LINUX #endif // _TNC_LINUX

View File

@ -28,8 +28,9 @@ namespace tincan
{ {
SingleLinkTunnel::SingleLinkTunnel( SingleLinkTunnel::SingleLinkTunnel(
unique_ptr<TunnelDescriptor> descriptor, unique_ptr<TunnelDescriptor> descriptor,
ControllerLink * ctrl_handle) : ControllerLink * ctrl_handle,
BasicTunnel(move(descriptor), ctrl_handle) TunnelThreads *thread_pool) :
BasicTunnel(move(descriptor), ctrl_handle, thread_pool)
{} {}
shared_ptr<VirtualLink> shared_ptr<VirtualLink>
@ -105,7 +106,7 @@ void SingleLinkTunnel::QueryLinkInfo(
{ {
LinkInfoMsgData md; LinkInfoMsgData md;
md.vl = vlink_; md.vl = vlink_;
net_worker_->Post(RTC_FROM_HERE, this, MSGID_QUERY_NODE_INFO, &md); NetworkThread()->Post(RTC_FROM_HERE, this, MSGID_QUERY_NODE_INFO, &md);
md.msg_event.Wait(Event::kForever); md.msg_event.Wait(Event::kForever);
vlink_info[TincanControl::Stats].swap(md.info); vlink_info[TincanControl::Stats].swap(md.info);
vlink_info[TincanControl::Status] = "ONLINE"; vlink_info[TincanControl::Status] = "ONLINE";
@ -135,7 +136,7 @@ void SingleLinkTunnel::SendIcc(
unique_ptr<TransmitMsgData> md = make_unique<TransmitMsgData>(); unique_ptr<TransmitMsgData> md = make_unique<TransmitMsgData>();
md->frm = move(icc); md->frm = move(icc);
md->vl = vlink_; md->vl = vlink_;
net_worker_->Post(RTC_FROM_HERE, this, MSGID_SEND_ICC, md.release()); NetworkThread()->Post(RTC_FROM_HERE, this, MSGID_SEND_ICC, md.release());
} }
@ -145,7 +146,7 @@ void SingleLinkTunnel::Shutdown()
{ {
LinkInfoMsgData md; LinkInfoMsgData md;
md.vl = vlink_; md.vl = vlink_;
net_worker_->Post(RTC_FROM_HERE, this, MSGID_DISC_LINK, &md); NetworkThread()->Post(RTC_FROM_HERE, this, MSGID_DISC_LINK, &md);
md.msg_event.Wait(Event::kForever); md.msg_event.Wait(Event::kForever);
} }
vlink_.reset(); vlink_.reset();
@ -155,14 +156,24 @@ void SingleLinkTunnel::Shutdown()
void void
SingleLinkTunnel::StartIo() SingleLinkTunnel::StartIo()
{ {
tdev_->Up(); if (!TapThread()->IsCurrent()) {
TapThread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(TapThread());
tdev_->Up();
});
}
BasicTunnel::StartIo(); BasicTunnel::StartIo();
} }
void void
SingleLinkTunnel::StopIo() SingleLinkTunnel::StopIo()
{ {
tdev_->Down(); if (!TapThread()->IsCurrent()) {
TapThread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(TapThread());
tdev_->Down();
});
}
} }
void SingleLinkTunnel::RemoveLink( void SingleLinkTunnel::RemoveLink(
@ -176,7 +187,7 @@ void SingleLinkTunnel::RemoveLink(
{ {
LinkInfoMsgData md; LinkInfoMsgData md;
md.vl = vlink_; md.vl = vlink_;
net_worker_->Post(RTC_FROM_HERE, this, MSGID_DISC_LINK, &md); NetworkThread()->Post(RTC_FROM_HERE, this, MSGID_DISC_LINK, &md);
md.msg_event.Wait(Event::kForever); md.msg_event.Wait(Event::kForever);
} }
vlink_.reset(); vlink_.reset();
@ -203,7 +214,12 @@ void SingleLinkTunnel::VlinkReadComplete(
frame->BufferToTransfer(frame->Payload()); //write frame payload to TAP frame->BufferToTransfer(frame->Payload()); //write frame payload to TAP
frame->BytesToTransfer(frame->PayloadLength()); frame->BytesToTransfer(frame->PayloadLength());
frame->SetWriteOp(); frame->SetWriteOp();
tdev_->Write(*frame.release()); if (TapThread() == NetworkThread())
tdev_->Write(move(frame));
else{
TapMessageData *tp_ = new TapMessageData(move(frame));
TapThread()->Post(RTC_FROM_HERE, this, MSGID_TAP_WRITE, tp_);
}
} }
else if(fp.IsIccMsg()) else if(fp.IsIccMsg())
{ // this is an ICC message, deliver to the controller { // this is an ICC message, deliver to the controller
@ -258,15 +274,15 @@ void SingleLinkTunnel::TapReadComplete(
TransmitMsgData *md = new TransmitMsgData; TransmitMsgData *md = new TransmitMsgData;
md->frm.reset(frame); md->frm.reset(frame);
md->vl = vlink_; md->vl = vlink_;
net_worker_->Post(RTC_FROM_HERE, this, MSGID_TRANSMIT, md); NetworkThread()->Post(RTC_FROM_HERE, this, MSGID_TRANSMIT, md);
} }
} }
void SingleLinkTunnel::TapWriteComplete( void SingleLinkTunnel::TapWriteComplete(
AsyncIo * aio_wr) AsyncIo*aio_wr)
{ {
//TapFrame * frame = static_cast<TapFrame*>(aio_wr->context_); //TapFrame * frame = static_cast<TapFrame*>(aio_wr->context_);
delete static_cast<TapFrame*>(aio_wr->context_); delete static_cast<TapFrame*>(aio_wr->Context());
} }
} // end namespace tincan } // end namespace tincan

View File

@ -67,7 +67,7 @@ void Tincan::CreateTunnel(
} }
td->enable_ip_mapping = false; td->enable_ip_mapping = false;
unique_ptr<BasicTunnel> tnl; unique_ptr<BasicTunnel> tnl;
tnl = make_unique<SingleLinkTunnel>(move(td), ctrl_link_); tnl = make_unique<SingleLinkTunnel>(move(td), ctrl_link_, &thread_pool_);
unique_ptr<TapDescriptor> tap_desc = make_unique<TapDescriptor>(); unique_ptr<TapDescriptor> tap_desc = make_unique<TapDescriptor>();
tap_desc->name = tnl_desc["TapName"].asString(); tap_desc->name = tnl_desc["TapName"].asString();
tap_desc->ip4 = tnl_desc["IP4"].asString(); tap_desc->ip4 = tnl_desc["IP4"].asString();

View File

@ -0,0 +1,58 @@
/*
* EdgeVPNio
* Copyright 2020, University of Florida
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "tunnel_threads.h"
namespace tincan
{
unsigned int TunnelThreads::num_ = 0;
TunnelThreads::TunnelThreads():
signal_thread_(rtc::SocketServer::CreateDefault()),
network_thread_(rtc::SocketServer::CreateDefault()),
tap_thread_(rtc::SocketServer::CreateDefault())
{
signal_thread_.SetName("SignalThread", &num_);
signal_thread_.Start();
network_thread_.SetName("NetworkThread", &num_);
network_thread_.Start();
tap_thread_.SetName("TapThread", &num_);
tap_thread_.Start();
}
TunnelThreads::~TunnelThreads(){
signal_thread_.Quit();
network_thread_.Quit();
tap_thread_.Quit();
}
std::pair<rtc::Thread*, rtc::Thread*>
TunnelThreads::LinkThreads(){
return make_pair(&signal_thread_, &network_thread_);
}
rtc::Thread*
TunnelThreads::TapThread(){
return &tap_thread_;
}
} // namespace tincan

View File

@ -56,11 +56,13 @@ VirtualLink::VirtualLink(
VirtualLink::~VirtualLink() VirtualLink::~VirtualLink()
{ {
// port_allocator_ lives on the network thread and should be destroyed there. if (!network_thread_->IsCurrent()) {
network_thread_->Invoke<void>(RTC_FROM_HERE, [this] { // port_allocator_ lives on the network thread and should be destroyed there.
RTC_DCHECK_RUN_ON(network_thread_); network_thread_->Invoke<void>(RTC_FROM_HERE, [this] {
port_allocator_.reset(); RTC_DCHECK_RUN_ON(network_thread_);
}); port_allocator_.reset();
});
}
} }
string VirtualLink::Name() string VirtualLink::Name()