TradingView js api demo (四)websocket服务代码
2021-11-30 10:44:54
more 
1584

websocket服务代码,用的hyperf框架,需要swoole扩展,参考这个,创建websocket服务

下面是对应控制器的代码

<?php
declare(strict_types&#61;1);

namespace App\Controller;

use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
use Hyperf\DbConnection\Db;
use Hyperf\Utils\ApplicationContext;
use function GuzzleHttp\json_decode;


/**
 * 介绍一下大概思路
 * 1,订阅redis中所有的channel
 * 2,当有链接进来,需要实时获取最新数据时,就注册到类的sublist数组里
 * 3,当redis有数据更新时,遍历sublist数组,对订阅此channel的每个链接返回数据
 * 4,取消订阅就是把这个链接从sublist数组里删除
 * 
 * 5,获取历史数据没啥好说的,查询数据库,把数据发给前端就行了
 */
class BasisModeTestController implements OnMessageInterface,OnOpenInterface,OnCloseInterface
{

    //记录订阅者
    public $sublist &#61; [];

    public function __construct()
    {   
        \Swoole\Runtime::enableCoroutine();

        $this->subscribe();
    }

    //订阅redis中的所有path
    public function subscribe()
    {
        go(function(){
            $container &#61; ApplicationContext::getContainer();
            $redis &#61; $container->get(\Hyperf\Redis\Redis::class);

            //订阅redis
            $redis->psubscribe(array('zl_*'),[$this,'callback']);
        });
    }

    //根据接收到的method参数,执行对应的方法
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {   
        $input &#61; json_decode($frame->data,true);

        if(empty($input['method'])){
            return;
        }

        $method &#61; $input['method'];

        $this->{$method}($server,$frame,$input);
    }

    public function onOpen(WebSocketServer $server, Request $request): void
    {
        echo &#34;有新链接{$request->fd}\n&#34;;
    }

    //返回指定时间内的数据
    public function KlineHistory($server,$frame,$map)
    {
        go(function()use($server,$frame,$map){
            $str  &#61; strtolower($map['data']['market']);
            $path &#61; substr($str,0,strlen($str)-3);
            $mode &#61; substr($str,-2,2);

            $startTime &#61; $map['data']['from'] * 1000;
            $endTime   &#61; $map['data']['to'] * 1000;
            
            $interval  &#61; $this->formatInterval($map['data']['resolution']);
            
            $method    &#61; $map['method'];
            
            $sql &#61; &#34;select &#96;create_time&#96;,&#96;open&#96;,&#96;close&#96;,&#96;tallest&#96;,&#96;lowest&#96; from &#96;hedge_basis_mode&#96; where &#96;interval&#96;&#61;'{$interval}' and &#96;path&#96;&#61;'{$path}' and &#96;mode&#96;&#61;'{$mode}' and (&#96;create_time&#96;>&#61;{$startTime} and &#96;create_time&#96;<&#61;{$endTime})&#34;;
            $list &#61; Db::select($sql);
            echo $sql.&#34;\n&#34;;
            $res &#61; [
                'data'&#61;>$list,
                'method'&#61;>$method,
                'name'&#61;>$map['data']['market'],
                'from'&#61;>$map['data']['from'],
                'to'&#61;>$map['data']['to'],
                'resolution'&#61;>$map['data']['resolution']
            ];

            $server->push($frame->fd,json_encode($res));
        });
    }

    //对前端传过来的时间格式转换成数据库存的时间格式
    public function formatInterval($interval)
    {
        if ($interval &#61;&#61; 5) {
            $interval &#61; '5m';
        }else if($interval &#61;&#61; 60){
            $interval &#61; '1h';
        }else if($interval &#61;&#61; '1D'){
            $interval &#61; '1d';
        }
        return $interval;
    }

    //实时更新数据
    public function KlineUpdata($server,$frame,$map)
    {
        //这里没有直接返回数据,而是添加了一个订阅者
        $this->addSubscriber($server,$frame,$map);
    }

    //移除订阅者
    public function removeSubscriber($server,$frame,$map)
    {
        $map['id'] &#61; strtolower($map['id']);
        echo &#34;取消{$frame->fd}下的{$map['id']}\n&#34;;
        foreach ($this->sublist as $redisKey&#61;>$list) {
            foreach ($list as $index &#61;> $sub) {
                if($sub['id'] &#61;&#61; $map['id'] && ($sub['frame'])->fd &#61;&#61; $frame->fd){
                    echo &#34;取消{$map['id']}成功\n&#34;;
                    unset($this->sublist[$redisKey][$index]);
                }
            }
        }
    }

    //添加订阅者
    public function addSubscriber($server,$frame,$map)
    {
        $str  &#61; strtolower($map['data']['market']);
        $path &#61; substr($str,0,strlen($str)-3);
        $mode &#61; substr($str,-2,2);
        $interval  &#61; $this->formatInterval($map['data']['resolution']);
        //整理redis订阅的key
        $redisKey &#61; &#34;zl_{$mode}_{$interval}_{$path}&#34;;

        $this->sublist[$redisKey][] &#61; [
            'server'&#61;>$server,
            'frame'&#61;>$frame,
            'id'&#61;>strtolower($map['data']['id'])
        ];
    }

    public function callback($redis, $pattern, $channel, $message)
    {
        if(empty($this->sublist[$channel])){
            return;
        }

        $row &#61; json_decode(str_replace('\'','&#34;',trim($message,&#34;'&#34;)),true);

         //实时传送到前端
         $list &#61; [
            'time'    &#61;> $row['time'],
            'open'    &#61;> $row['open'],
            'close'   &#61;> $row['close'],
            'tallest' &#61;> $row['tallest'],
            'lowest'  &#61;> $row['lowest']
        ];
        
        //给每个订阅此path的订阅者发送消息
        foreach ($this->sublist[$channel] as $sub) {
            ($sub['server'])->push(($sub['frame'])->fd,json_encode(
                $res &#61; [
                    'data'&#61;>$list,
                    'method'&#61;>'KlineUpdata',
                    'id'&#61;>$sub['id']
                ]
            ));
        }
    }

    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        echo &#34;{$fd}--链接关闭\n&#34;;
    }
}

 

 

Statement:
The content of this article does not represent the views of fxgecko website. The content is for reference only and does not constitute investment suggestions. Investment is risky, so you should be careful in your choice! If it involves content, copyright and other issues, please contact us and we will make adjustments at the first time!

Related News

您正在访问的是FxGecko网站。 FxGecko互联网及其移动端产品是中国香港特别行政区成立的Hitorank Co.,LIMITED旗下运营和管理的一款面向全球发行的企业资讯査询工具。

您的IP为 中国大陆地区,抱歉的通知您,不能为您提供查询服务,还请谅解。请遵守当地地法律。