• 注册
  • WordPress后台-外观-小工具 进行配置小工具

    swoole swoole 关注:1 内容:9

    基于swoole的task功能实现程序内的map-reduce

  • 查看作者
  • 打赏作者
  • 拉黑名单
    • swoole
    • Swoole扩展自带的Task进程功能非常强大,可以用来实现各种复杂的业务逻辑。本文主要介绍使用task/finish功能实现程序内的Map-Reduce并发任务处理。一个聊天服务经常会有群聊需求,我的群组和群组内成员,另外群组内成员需要按照积分排序,类似与这样的功能就可以使用Swoole简单实现。
      
      传统多线程方案
      创建2个全局变量Map,group_map以group_id为Key,存储成员set。user_map以uid为Key存储当前用户加入的所有group。
      
      多线程环境下实际上不能直接操作这2个Map,必须要加锁。当添加用户到一个组或者用户退出一个组时需要操作这2个map,必须要加锁。如果操作很频繁,实际上锁的碰撞是很严重的,这部分操作就会变成串行的。同时只有一个线程可以对map进行操作。锁的争抢也会带来大量线程切换浪费很多CPU资源。
      
      lock.lock();
      group_map[group_id].append([uid, score]);
      user_map[uid].append(group_id);
      group_map.sortByScore();
      lock.unlock();
      基于Swoole的Task功能
      基于Swoole的Task功能,可以将任务切片,然后hash投递到不同的Task进程,完成任务。排序功能可以直接使用PHP提供的SplHeap实现,时间复杂度为O(logn),如果要实现查询功能,如根据UID查询用户加入的所有群组,根据GroupId查询有哪些成员。可以先计算Hash找到对应Task进程,然后通过task/taskwait发送指令,直接读取进程的变量查找到信息。
      
      $serv->set(array("task_worker_num" => 24));
      
      $serv->task(array("cmd" => "user", "uid" => $uid, "gid" => $gid, "score" => $score), $gid % $task_worker_num);
      $serv->task(array("cmd" => "group", "uid" => $uid, "gid" => $gid), $uid % $task_worker_num);
      
      class MyMaxHeap extends SplHeap
      {
          public function compare($value1, $value2)
          {
              return ($value1['score'] - $value2['score']);
          }
      }
      
      function onTask($serv, $taskId, $srcWorkerId, $data) {
          static $userMap = array();
          static $groupMap = array();
          
          if ($data['cmd'] == 'group')
          {
              if (!isset($groupMap[$data['gid']]))
              {
                  $groupMap[$data['gid']] = new MyMaxHeap();
              }
              $heap = $groupMap[$data['gid']];
              $heap->insert(array("uid" => $data['uid'], "score" => $data['score']));
          }
          elseif ($data['cmd'] == 'user')
          {
              $userMap[$data['uid']][] = $data['gid'];
          }
      }
      由于Task进程只有数组操作,所以是非阻塞的,只需要开启与CPU核数相同的进程数量即可。进程间无任何加锁争抢,性能非常好。Swoole的Task进程通信使用UnixSocket,是内核提供的全内存通信方式无任何IO,一写一读单进程可达100万/秒。虽然没有直接读变量的速度快,但性能也足够了。

      请登录之后再进行评论

      登录
    • 实时动态
    • 偏好设置
    • 返回顶部
    • 帖子间隔 侧栏位置: