当前位置 博文首页 > wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Tcp
TcpConnection
是TCP网络库中最核心、最复杂的类,也是唯一使用shared_ptr
管理的类,封装了一次TCP连接,但是它不能发起连接。当TcpServer
和TcpCliet
成功建立连接后,都会通过TcpConnection对象进行收发数据通信。
TcpServer新建连接的函数相关调用时序见下图,其中cpServer左侧是等待连接的过程。TcpServer成员Acceptor
持有一个listening socket
,封装成channel并注册到EvenetLoop的Poller,等到新客户端连接,Poller监听到IO事件,派发到channel,执行Acceptor注册的回调,返回已连接的socket和对端ip和端口给TcpServer,进而创建一个表示当前连接的TcpConnection。TcpConnection
将已连接的socket封装成channel,注册到EventLoop中的Poller,监听其可读、可写、关闭、出错等IO事件。多个tcp连接会保存在TcpServer的TcpConnection map结构中。
muduo只有一种方式断开连接:被动关闭,即客户端发起关闭连接。已连接的socket fd在时间循环中触发read返回0的事件被channel接受处理,调用TcpServer注册的removeConn()
回调函数执行关闭。TcpServer首先从连接的TcpConnection map中删除当前连接,接着在IO线程中调用TcpConnection::connectDestroyed()
函数,通过channel设置不再关注IO事件,调用服务端断连回调,最后从EventLoop移除channel、Poller中注销。
TcpConnection是muduo库中唯一使用shared_ptr来管理的类,因为它的生命周期模糊。当建立连接后,TcpServer中的map持有TcpConnection的shared_ptr指针,当TcpConnetion从TcpServer的连接map中erase()移除后,TcpConnetion的引用计数减一。如果用户不持有TcpConnection的shared_ptr指针,那么在调用TcpConnection::connectDestroyed()后,会出现访问已经析构的对象。因此,在TcpConnection内部在注册回调函数时,使用bind绑定其shared_ptr调用shared_from_this()来获取,以此增加引用计数、延长生命周期。
TcpConnection中封装了InputBuffer
和OutputBuffer
,用来表示应用层的缓冲区,调用者提供要发送的数据即可,不用操心数据如何发送、是否分次发送等问题。
///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,const string& name,
int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr);
~TcpConnection();
// 获取当前TcpConnetction的一些属性、状态
EventLoop* getLoop() const { return loop_; }
const string& name() const { return name_; }
const InetAddress& localAddress() const { return localAddr_; }
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected; }
bool disconnected() const { return state_ == kDisconnected; }
// return true if success.
bool getTcpInfo(struct tcp_info*) const;
string getTcpInfoString() const;
/// 提供上层调用的发送接口,直接发送或保存在Buffer中
// void send(string&& message); // C++11
void send(const void* message, int len);
void send(const StringPiece& message);
// void send(Buffer&& message); // C++11
void send(Buffer* message); // this one will swap data
// 关闭连接,设置TCP选项
void shutdown(); // NOT thread safe, no simultaneous calling
// void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
void forceClose();
void forceCloseWithDelay(double seconds);
void setTcpNoDelay(bool on);
// 开启/关闭当前连接socket上的可读事件监听(通过channel传递到Poller)
void startRead();
void stopRead();
bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop
// 中间变量
void setContext(const std::any& context){ context_ = context; }
const std::any& getContext() const { return context_; }
std::any* getMutableContext() { return &context_; }
// 回调函数设置
void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback& cb){ writeCompleteCallback_ = cb; }
// 限制发送缓冲区最大的数据堆积量,一旦超过设定值调用指定回调函数
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
/// Advanced interface 输入输出缓冲区,应用层不用关系底层处理流程
Buffer* inputBuffer() { return &inputBuffer_; }
Buffer* outputBuffer() { return &outputBuffer_; }
/// Internal use only.
void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; }
// called when TcpServer accepts a new connection
void connectEstablished(); // should be called only once
// called when TcpServer has removed me from its map
void connectDestroyed(); // should be called only once
private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
// void sendInLoop(string&& message);
void sendInLoop(const StringPiece& message);
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();
// void shutdownAndForceCloseInLoop(double seconds);
void forceCloseInLoop();
void setState(StateE s) { state_ = s; }
const char* stateToString() const;
void startReadInLoop(); // 开始接收可读事件
void stopReadInLoop();
EventLoop* loop_;
const string name_;
StateE state_; // FIXME: use atomic variable
bool reading_;
// 已连接的socketfd的封装Socket、Channel对象
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
// IO事件回调,要求在EventLoop中执行
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
// 底层的输入、输出缓冲区的处理
size_t highWaterMark_;
Buffer inputBuffer_;
Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
std::any context_;
// FIXME: creationTime_, lastReceiveTime_
// bytesReceived_, bytesSent_
};
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr; // 智能指针管理
每一个TcpConnection都由TcpServer创建,构造时传递所属的EvenetLoop、已连接的socketfd、本端地址、对端地址。默认连接状态是kConneting
。注册了channel上事件的回调处理函数。注意,此时channel上不关注任何IO事件。
析构时,不对socketfd进行关闭的处理,作为服务端多由外部发起执行关闭。
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)), // 已连接socketfd封装的Socket对象
channel_(new Channel(loop, sockfd)), // 构造的channel
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024) // 缓冲区数据最大64M
{
// 向Channel对象注册可读事件
channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd;
// 设置保活机制
socket_->setKeepAlive(true);
}
TcpConnection::~TcpConnection()
{
LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this
<< " fd=" <<