• 注册
  • swoole 关注:0 内容:7

    Swoole 源码分析——Client模块之Send

  • 查看作者
  • 打赏作者
  • 拉黑名单
    • send 入口

      本入口函数逻辑非常简单,从 PHP 函数中获取数据 data,然后调用 connect 函数。

      static PHP_METHOD(swoole_client, send)

      {

          char *data;

          zend_size_t data_len;

          zend_long flags = 0;

      #ifdef FAST_ZPP

          ZEND_PARSE_PARAMETERS_START(1, 2)

              Z_PARAM_STRING(data, data_len)

              Z_PARAM_OPTIONAL

              Z_PARAM_LONG(flags)

          ZEND_PARSE_PARAMETERS_END();

      #else

          if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, “s|l”, &data, &data_len, &flags) == FAILURE)

          {

              return;

          }

      #endif

          swClient *cli = client_get_ptr(getThis() TSRMLS_CC);

          //clear errno

          SwooleG.error = 0;

          int ret = cli->send(cli, data, data_len, flags);

          if (ret < 0)

          {

              swoole_php_sys_error(E_WARNING, “failed to send(%d) %zd bytes.”, cli->socket->fd, data_len);

              zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL(“errCode”)-1, SwooleG.error TSRMLS_CC);

              RETVAL_FALSE;

          }

          else

          {

              RETURN_LONG(ret);

          }

      }

      swClient_tcp_send_sync 同步 TCP 客户端

      对于同步客户端来说,发送数据是通过 swConnection_send 函数来进行阻塞式调用 send,当返回的错误是 EAGAIN 的时候调用 swSocket_wait 等待 1s。

      static int swClient_tcp_send_sync(swClient *cli, char *data, int length, int flags)

      {

          int written = 0;

          int n;

          assert(length > 0);

          assert(data != NULL);

          while (written < length)

          {

              n = swConnection_send(cli->socket, data, length – written, flags);

              if (n < 0)

              {

                  if (errno == EINTR)

                  {

                      continue;

                  }

                  else if (errno == EAGAIN)

                  {

                      swSocket_wait(cli->socket->fd, 1000, SW_EVENT_WRITE);

                      continue;

                  }

                  else

                  {

                      SwooleG.error = errno;

                      return SW_ERR;

                  }

              }

              written += n;

              data += n;

          }

          return written;

      }

      swClient_tcp_send_async 异步 TCP 客户端

      由于异步客户端已经设置为非阻塞,并且加入了 reactor 的监控,因此发送数据只需要 reactor->write 函数即可。当此时的套接字不可写的时候,会自动放入 out_buff 缓冲区中。

      当 out_buffer 大于高水线时,会自动调用 onBufferFull 回调函数。

      static int swClient_tcp_send_async(swClient *cli, char *data, int length, int flags)

      {

          int n = length;

          if (cli->reactor->write(cli->reactor, cli->socket->fd, data, length) < 0)

          {

              if (SwooleG.error == SW_ERROR_OUTPUT_BUFFER_OVERFLOW)

              {

                  n = -1;

                  cli->socket->high_watermark = 1;

              }

              else

              {

                  return SW_ERR;

              }

          }

          if (cli->onBufferFull && cli->socket->out_buffer && cli->socket->high_watermark == 0

                  && cli->socket->out_buffer->length >= cli->buffer_high_watermark)

          {

              cli->socket->high_watermark = 1;

              cli->onBufferFull(cli);

          }

          return n;

      }

      swClient_udp_send UDP 客户端

      对于 UDP 客户端来说,如果 Socket 缓存区塞满,并不会像 TCP 进行等待 reactor 可写状态,而是直接返回了结果。对于异步客户端来说,仅仅是非阻塞调用 sendto 函数。

      static int swClient_udp_send(swClient *cli, char *data, int len, int flags)

      {

          int n;

          n = sendto(cli->socket->fd, data, len, 0, (struct sockaddr *) &cli->server_addr.addr, cli->server_addr.len);

          if (n < 0 || n < len)

          {

              return SW_ERR;

          }

          else

          {

              return n;

          }

      }

      sendto UDP 客户端

      类似于 send 函数,sendto 函数专门针对 UDP 客户端,与 send 函数不同的是,sendto 函数在底层套接字缓冲区塞满的时候,会调用 swSocket_wait 进行阻塞等待。

      static PHP_METHOD(swoole_client, sendto)

      {

          char* ip;

          zend_size_t ip_len;

          long port;

          char *data;

          zend_size_t len;

          if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, “sls”, &ip, &ip_len, &port, &data, &len) == FAILURE)

          {

              return;

          }

          swClient *cli = (swClient *) swoole_get_object(getThis());

          int ret;

          if (cli->type == SW_SOCK_UDP)

          {

              ret = swSocket_udp_sendto(cli->socket->fd, ip, port, data, len);

          }

          else if (cli->type == SW_SOCK_UDP6)

          {

              ret = swSocket_udp_sendto6(cli->socket->fd, ip, port, data, len);

          }

          else

          {

              swoole_php_fatal_error(E_WARNING, “only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6.”);

              RETURN_FALSE;

          }

          SW_CHECK_RETURN(ret);

      }

      int swSocket_udp_sendto(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len)

      {

          struct sockaddr_in addr;

          if (inet_aton(dst_ip, &addr.sin_addr) == 0)

          {

              swWarn(“ip[%s] is invalid.”, dst_ip);

              return SW_ERR;

          }

          addr.sin_family = AF_INET;

          addr.sin_port = htons(dst_port);

          return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr));

      }

      int swSocket_udp_sendto6(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len)

      {

          struct sockaddr_in6 addr;

          bzero(&addr, sizeof(addr));

          if (inet_pton(AF_INET6, dst_ip, &addr.sin6_addr) < 0)

          {

              swWarn(“ip[%s] is invalid.”, dst_ip);

              return SW_ERR;

          }

          addr.sin6_port = (uint16_t) htons(dst_port);

          addr.sin6_family = AF_INET6;

          return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr));

      }

      int swSocket_sendto_blocking(int fd, void *__buf, size_t __n, int flag, struct sockaddr *__addr, socklen_t __addr_len)

      {

          int n = 0;

          while (1)

          {

              n = sendto(fd, __buf, __n, flag, __addr, __addr_len);

              if (n >= 0)

              {

                  break;

              }

              else

              {

                  if (errno == EINTR)

                  {

                      continue;

                  }

                  else if (swConnection_error(errno) == SW_WAIT)

                  {

                      swSocket_wait(fd, 1000, SW_EVENT_WRITE);

                      continue;

                  }

                  else

                  {

                      break;

                  }

              }

          }

          return n;

      }

      swClient_onWrite 写就绪状态

      当 reactor 监控到套接字进入了写就绪状态时,就会调用 swClient_onWrite 函数。

      从上一章我们知道,异步客户端建立连接过程中 swoole 调用了 connect 函数,该返回 0,或者返回错误码 EINPROGRESS 都会对写就绪进行监控。无论哪种情况,swClient_onWrite 被调用就说明此时连接已经建立成功,三次握手已经完成,但是 cli->socket->active 还是 0。

      如果 cli->socket->active 为 0,说明此时异步客户端虽然建立了连接,但是还没有调用 onConnect 回调函数,因此这时要调用 execute_onConnect 函数。如果使用了 SSL 隧道加密,还要进行 SSL 握手,并且设置 _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM。

      当 active 为 1 的时候,就可以调用 swReactor_onWrite 来发送数据。

      static int swClient_onWrite(swReactor *reactor, swEvent *event)

      {

          swClient *cli = event->socket->object;

          swConnection *_socket = cli->socket;

          if (cli->socket->active)

          {

      #ifdef SW_USE_OPENSSL

              if (cli->open_ssl && _socket->ssl_state == SW_SSL_STATE_WAIT_STREAM)

              {

                  if (swClient_ssl_handshake(cli) < 0)

                  {

                      goto connect_fail;

                  }

                  else if (_socket->ssl_state == SW_SSL_STATE_READY)

                  {

                      goto connect_success;

                  }

                  else

                  {

                      if (_socket->ssl_want_read)

                      {

                          cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ);

                      }

                      return SW_OK;

                  }

              }

      #endif

              if (swReactor_onWrite(cli->reactor, event) < 0)

              {

                  return SW_ERR;

              }

              if (cli->onBufferEmpty && _socket->high_watermark && _socket->out_buffer->length <= cli->buffer_low_watermark)

              {

                  _socket->high_watermark = 0;

                  cli->onBufferEmpty(cli);

              }

              return SW_OK;

          }

          socklen_t len = sizeof(SwooleG.error);

          if (getsockopt(event->fd, SOL_SOCKET, SO_ERROR, &SwooleG.error, &len) < 0)

          {

              swWarn(“getsockopt(%d) failed. Error: %s[%d]”, event->fd, strerror(errno), errno);

              return SW_ERR;

          }

          //success

          if (SwooleG.error == 0)

          {

              //listen read event

              cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ);

              //connected

              _socket->active = 1;

      #ifdef SW_USE_OPENSSL

              if (cli->open_ssl)

              {

                  if (swClient_enable_ssl_encrypt(cli) < 0)

                  {

                      goto connect_fail;

                  }

                  if (swClient_ssl_handshake(cli) < 0)

                  {

                      goto connect_fail;

                  }

                  else

                  {

                      _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM;

                  }

                  return SW_OK;

              }

              connect_success:

      #endif

              if (cli->onConnect)

              {

                  execute_onConnect(cli);

              }

          }

          else

          {

      #ifdef SW_USE_OPENSSL

              connect_fail:

      #endif

              _socket->active = 0;

              cli->close(cli);

              if (cli->onError)

              {

                  cli->onError(cli);

              }

          }

          return SW_OK;

      }

      static sw_inline void execute_onConnect(swClient *cli)

      {

          if (cli->timer)

          {

              swTimer_del(&SwooleG.timer, cli->timer);

              cli->timer = NULL;

          }

          cli->onConnect(cli);

      }

      client_onConnect

      static void client_onConnect(swClient *cli)

      {

          zval *zobject = (zval *) cli->object;

      #ifdef SW_USE_OPENSSL

          if (cli->ssl_wait_handshake)

          {

              client_execute_callback(zobject, SW_CLIENT_CB_onSSLReady);

          }

          else

      #endif

          if (!cli->redirect)

          {

              client_execute_callback(zobject, SW_CLIENT_CB_onConnect);

          }

          else

          {

              client_callback *cb = (client_callback *) swoole_get_property(zobject, 0);

              if (!cb || !cb->onReceive)

              {

                  swoole_php_fatal_error(E_ERROR, “has no 'onReceive' callback function.”);

              }

          }

      }

      你需要登录,才能进行发帖操作
    • 单栏布局 帖子间隔 侧栏位置: