网站首页swoole

在1核CPU,1G内存,2M带宽的vps上实现2000并发无失败写入数据库的程序实现

发布时间:2017-05-25 11:23:22编辑:slayer.hover阅读(1170)

           

    VPS配置 CPU: 1核    内存:1 GB     内存:2Mbps, 用的阿里云最低配,算是垫底的了。


    部署环境:

    centos7 + openresty(nginx_lua) + redis(连接池) + swoole(php)+ mysql


    开发步骤:

    1. 部署swoole服务端,等待接收写数据信号。

    2. 使用lua脚本写主页面,使用redis连接池,把要写入的数据lpush推入redis缓存中。

    3. redis数据写入后,用lua脚本触发swoole客户端,发送写数据信号。

    4. swoole服务端接收到信号后,用PHP轮询redis队列,执行数据库写入。


    AB测压结果

    10000请求数,500并发 :

    Concurrency Level:      500
    Time taken for tests:   19.823 seconds
    Complete requests:      10000
    Failed requests:        0
    Write errors:           0
    Total transferred:      1670000 bytes
    HTML transferred:       140000 bytes
    Requests per second:    504.46 [#/sec] (mean)
    Time per request:       991.161 [ms] (mean)
    Time per request:       1.982 [ms] (mean, across all concurrent requests)
    Transfer rate:          82.27 [Kbytes/sec] received


    10000请求数,1000并发 :

    Concurrency Level:      1000
    Time taken for tests:   24.471 seconds
    Complete requests:      10000
    Failed requests:        0
    Write errors:           0
    Total transferred:      1670000 bytes
    HTML transferred:       140000 bytes
    Requests per second:    408.65 [#/sec] (mean)
    Time per request:       2447.062 [ms] (mean)
    Time per request:       2.447 [ms] (mean, across all concurrent requests)
    Transfer rate:          66.65 [Kbytes/sec] received


    10000请求数,2000并发 :

    Concurrency Level:      2000
    Time taken for tests:   54.464 seconds
    Complete requests:      10000
    Failed requests:        0
    Write errors:           0
    Total transferred:      1670000 bytes
    HTML transferred:       140000 bytes
    Requests per second:    183.61 [#/sec] (mean)
    Time per request:       10892.885 [ms] (mean)
    Time per request:       5.446 [ms] (mean, across all concurrent requests)
    Transfer rate:          29.94 [Kbytes/sec] received


    可以看到,在2000并发时,虽然吞吐量和数据传输率下降的厉害,但也并没有失败的请求数。

    数据库中也能查到数据完整的写入,缺点就是数据更新并不是实时的, 测压结束后,等swoole更新数据库还得老半天,

    开太多task写数据库, 很容易把mysql搞死。

    页面高并发,主要用到的是nginx_lua的超快处理能力。



    附带上各开发部署的主要代码:

    nginx:

    server
    {
        listen       80;
        server_name  website.com;
        index index.html index.php;
        root  /home/wwwroot/swoole;
        location /test {
            default_type 'text/html';
            content_by_lua_file /usr/local/nginx/conf/lua;
        }
    }

    lua:

    local function close_redis(red)
        if not red then
            return
        end
        local pool_max_idle_time = 10000 --毫秒  
        local pool_size = 50 --连接池大小  
        local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
        if not ok then
                ngx.say("set keepalive error : ", err)
        end
    end
    ngx.header['Content-Type']="text/html;charset=UTF-8"
    local redis = require("resty.redis")
    local red = redis:new()
    red:set_timeout(1000)
    local ip = "127.0.0.1"
    local port = 6379
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.say("connect to redis error : ", err)
        return close_redis(red)
    end
    --获取GET变量
    local cjson = require("cjson")
    local obj  = ngx.req.get_uri_args()
    local value= cjson.encode(obj)
    ok, err = red:lpush("sql", value)
    if not ok then
        ngx.say("lpush msg error : ", err)
        return close_redis(red)
    end
    close_redis(red)
    ngx.location.capture("/client.php")
    ngx.say("执行完毕!")

    swoole服务端代码

    server.php

    include_once('DB.php');
    class Server
    {
        private $serv;
        private static $db;
        private static $cache;
        public function __construct() {
            $this->serv = new swoole_server("127.0.0.1", 9501);
            $this->serv->set(array(
                'worker_num'     => 4,   //一般设置为服务器CPU数的1-4倍
                'daemonize'      => 1,  //以守护进程执行
                'max_request'     => 20000,
                'dispatch_mode'    => 1,//进程数据包分配模式 1平均分配,2按FD取摸固定分配,3抢占式分配            
                'log_file'      => "./logs/swoole.log" ,//日志
            ));
            $this->serv->on('Receive',array($this,'onReceive'));//接收到数据时回调此函数
            $this->serv->on('WorkerStart',array($this,'onWorkerStart'));
            $this->serv->on('WorkerStop',array($this,'onWorkerStop'));
            $this->serv->on('ManagerStart', function ($serv) {
                    global $argv;
                    swoole_set_process_name("php {$argv[0]}: manager");
            });
            $this->serv->start();
        }
        public function onReceive(swoole_server $serv, $fd, $from_id, $data) {
            if($data=='shutdown'){
                $serv->send($fd, 'SHUTDOWN');
                $serv->close($fd);
                $serv->shutdown();
            }elseif($data=='sql'){
                if(self::$cache->exists($data)){
                    $this->taskQueue($data, $serv->worker_id);
                }
                }
        }
        public function taskQueue($data, $worker_id) {      
          self::$db->ping(); /***判断数据库连接是否正常***/ 
          try{
            while($rows= self::$cache->rPop($data)){
            $rowsarray    =json_decode($row, TRUE);
            $result= self::$db->add('test', $rowsarray);
            if($result===FALSE)
                self::Log(date('Ymd'), "Error on {$worker_id}: ".$rows);
            }
          }catch(Exception $e){
                self::Log('queue', $e->getMessage());
            }
        } 
        public function onWorkerStart($serv, $worker_id)
        {
            global $argv;
            if ($worker_id >= 4) {
                swoole_set_process_name("php {$argv[0]}: task");
                } else {
                    swoole_set_process_name("php {$argv[0]}: worker");
                }
            if(self::$db==NULL){    
                $dbset    =    array(
                    'dsn'        =>    'mysql:host=localhost;dbname=test',
                    'name'       =>    'root',
                    'password'   =>    '123456',
                ); 
                    self::$db =   new DB($dbset);
                    self::Log(date('Ymd'), "Connect mysql on {$worker_id} at ".date('Y-m-d H:i:s'));
            }
            if(self::$cache==NULL){
                $server=array(
                    'host'=>'127.0.0.1',
                    'port'=>6379,
                );
                    self::$cache = new Redis();
                self::$cache->connect($server['host'], $server['port']);
                self::Log(date('Ymd'), "Connect redis on {$worker_id} at ".date('Y-m-d H:i:s'));
                }
        }
        public function onWorkerStop($serv, $worker_id)
        {
            if(self::$db!=NULL){
                self::$db->close();
                self::$db=NULL;
                self::Log(date('Ymd'), "Close mysql on {$worker_id} at " . date('Y-m-d H:i:s'));
            }
            if(self::$cache!=NULL){
                self::$cache->close();
                self::$cache=NULL;
                self::Log(date('Ymd'), "Close redis on {$worker_id} at " . date('Y-m-d H:i:s'));
            }
        }
        public static function Log($file="mylog", $data) {
            $file = dirname(__FILE__) . "/logs/" . $file . ".log";
            file_put_contents($file, $data."\r\n", FILE_APPEND | LOCK_EX);
        }
    }

    客户端代码

    client.php

    <?php
    $client = new swoole_client(SWOOLE_SOCK_TCP);
    if (!$client->connect('127.0.0.1', 9501, -1))
    {
        exit("connect failed. Error: {$client->errCode}\n");
    }
    $sender=$client->send("sql");
    $client->close();
    exit;


    ----------------------------redis连接池及lua脚本更新-------------------------------

    可以使用redis的upstream配置,在nginx.conf配置文件里

    upstream redispool{
        server 127.0.0.1:6379;
        keepalive    200;
    }
    server{
        location /redis{
            internal;
            set_unescape_uri $query $arg_query;
            redis2_query lpush mykey $query;
            redis2_pass    redispool;
        }
    }

    修改lua脚本为:

    ngx.header['Content-Type']="text/html;charset=UTF-8"
    --获取GET变量
    local cjson= require("cjson")
    local obj  = ngx.req.get_uri_args()
    local value= cjson.encode(obj)
    local res  = ngx.location.capture("/redis", {
                            args = {query = value}
    })
    if res.status ~= 200 then
        ngx.log(ngx.DEBUG, "Redis写入失败,capture /redis未返回成功标识.")
    end
    ngx.location.capture("/client.php")
    ngx.say("执行完毕!")
    ngx.exit(ngx.HTTP_OK);


    执行方式和原来一样, 不过可以配置主从redis结构或者使用多个redis服务器来提高并发能力。



评论