当前位置 博文首页 > 基于Golang 高并发问题的解决方案

    基于Golang 高并发问题的解决方案

    作者:iGoogle.ink 时间:2021-05-29 17:55

    Golang 高并发问题的解决

    Golang在高并发问题上,由于协程的使用,相对于其他编程语言,已经有了很大的优势,即相同的配置上,Golang可以以更低的代价处理更多的线程,同样的线程数,占用更低的资源!及时这样,只是解决了一部分问题而已,因为在每个协程里,处理逻辑还是会有问题。

    高并发时,还是要考虑服务器所能承受的最大压力,数据库读取时的io问题,连接数问题,带宽问题等等

    研究了一下并发解决方案,在此记录一下

    参考文章:Handling 1 Million Requests per Minute with Go

    地址:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

    代码如下:

    //==================================
    //  * Name:Jerry
    //  * Tel:18017448610
    //  * DateTime:2019/2/24 14:02
    //==================================
    package main
    import (
    	"github.com/lunny/log"
    	"runtime"
    	"sync"
    	"time"
    )
    //工厂模型
    type Factory struct {
    	Wg        *sync.WaitGroup //任务监控系统
    	MaxWorker int             //最大机器数
    	MaxJobs   int             //最大工作数量
    	JobQueue  chan int        //工作队列管道
    	Quit      chan bool       //是否关闭机器
    }
    //创建工厂模型
    func NewFactory(maxWorker int, wg *sync.WaitGroup) Factory {
    	return Factory{
    		Wg:        wg,                        //引用任务监控系统
    		MaxWorker: maxWorker,                 //机器数量(数量多少,根据服务器性能而定)
    		JobQueue:  make(chan int, maxWorker), //工作管道,数量大于等于机器数
    		Quit:      make(chan bool),
    	}
    }
    //设置最大订单数量
    func (f *Factory) SetMaxJobs(taskNum int) {
    	f.MaxJobs = taskNum
    }
    //开始上班
    func (f *Factory) Start() {
    	//机器开机,MaxWorker
    	for i := 0; i < f.MaxWorker; i++ {
    		//每一台机器开启后,去工作吧
    		go func() {
    			//等待下发命令
    			for {
    				select {
    				case i := <-f.JobQueue:
    					//接到工作,开工!
    					f.doWork(i)
    				case <-f.Quit:
    					log.Println("机器关机")
    					return
    				}
    			}
    		}()
    	}
    }
    //分配每个任务到管道中
    func (f *Factory) AddTask(taskNum int) {
    	//系统监控任务 +1
    	f.Wg.Add(1)
    	//分配任务到管道中
    	f.JobQueue <- taskNum
    }
    //模拟耗时工作
    func (f *Factory) doWork(taskNum int) {
    	//生产产品的工作
    	time.Sleep(200 * time.Millisecond)
    	//完成工作报告
    	f.Wg.Done()
    	//log.Println("完工:", taskNum)
    }
    //创建工厂
    func Begin() {
    	//配置工作核数
    	gomaxprocs := runtime.GOMAXPROCS(runtime.NumCPU())
    	log.Println("核数:", gomaxprocs)
    	//配置监控系统
    	wg := new(sync.WaitGroup)
    	//开工厂
    	factory := NewFactory(1000, wg)
    	//订单量
    	factory.SetMaxJobs(10000)
    	//开始上班
    	factory.Start()
    	log.Println("开始生产")
    	//讲所有的订单,添加到任务队列
    	for i := 0; i < factory.MaxJobs; i++ {
    		factory.AddTask(i)
    	}
    	factory.Wg.Wait()
    	log.Println("所有订单任务生产完成")
    }
    

    测试代码及结果

    上面代码中,MaxWorker的数量很重要,取决于服务器所能承受的压力,当然也不能无限增大,合理数值效率最高(具体多少合适,自己测试)

    代码:

    func Benchmark_Begin(b *testing.B) {
     Begin()
    }

    结果:

    1000台机器(协程),10000的工作量,我的个人PC测试结果如下:

    2019/02/26 16:42:31 核数: 4

    2019/02/26 16:42:31 开始生产

    2019/02/26 16:42:33 所有订单任务生产完成

    goos: windows

    goarch: amd64

    pkg: day11

    Benchmark_hight2-4 1 2035574000 ns/op

    PASS

    Process finished with exit code 0

    总结:

    此方法仅仅是在代码层面解决一定的问题,高并发 产生的原因还包括其他原因,如带宽,数据库读取速度等等,还需加大带宽,多级数据库,优化数据的检索等等方法

    补充:golang 高并发任务处理方案

    这个主要用golang 的chan 和routine属性做的,比很多语言方便多了,可以参考参考

    //任务的请求
    type MtaskRequest struct {
        Ceshi int
        // [redacted]
    }
     
    //job队列+work池
    var (
        MaxWorker = os.Getenv("MAX_WORKERS")
        MaxQueue  = os.Getenv("MAX_QUEUE")
    )
     
    // Job represents the job to be run
    type Job struct {
        MtaskRequest MtaskRequest
    }
     
    // A buffered channel that we can send work requests on.
     
    // var JobQueue chan Job ---这样申明会卡主,没有初始化
    var JobQueue = make(chan Job)
     
    // Worker represents the worker that executes the job
    type Worker struct {
        WorkerPool chan chan Job
        JobChannel chan Job
        quit       chan bool
    }
     
    func NewWorker(workerPool chan chan Job) Worker {
        return Worker{
            WorkerPool: workerPool,
            JobChannel: make(chan Job),
            quit:       make(chan bool)}
    }
     
    // Stop signals the worker to stop listening for work requests.
    func (w Worker) Stop() {
        go func() {
            w.quit <- true
        }()
    }
     
    type Dispatcher struct {
        // A pool of workers channels that are registered with the dispatcher
        WorkerPool chan chan Job
        maxWorkers int
    }
     
    func NewDispatcher(maxWorkers int) *Dispatcher {
        pool := make(chan chan Job, maxWorkers)
        return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers}
    }
     
    // Start method starts the run loop for the worker, listening for a quit channel in
    // case we need to stop it
    func (w Worker) Start() {
        go func() {
            for {
                // register the current worker into the worker queue.
                w.WorkerPool <- w.JobChannel
                select {
                case <-w.JobChannel:
                    time.Sleep(5 * time.Second)
                    // we have received a work request.
                    fmt.Println("调起worker")
                case <-w.quit:
                    // we have received a signal to stop
                    return
                    //不能写default
                }
            }
        }()
    }
     
    func (d *Dispatcher) Run() {
        //启动一定数量的worker
        fmt.Println("启动一定数量的worker")
        for i := 0; i < d.maxWorkers; i++ {
            worker := NewWorker(d.WorkerPool)
            worker.Start()
        }
     
        go d.dispatch()
    }
     
    //分派任务
    func (d *Dispatcher) dispatch() {
        for {
            select {
            case job := <-JobQueue: //接收一个job请求
                fmt.Println("JobQueue 收到请求")
     
                go func(job Job) {
                    // try to obtain a worker job channel that is available.
                    // this will block until a worker is idle
                    jobChannel := <-d.WorkerPool
                    // dispatch the job to the worker job channel
                    jobChannel <- job
                }(job)
            }
        }
    }
     
    //接收到红包数据
    func (this *TaskRedbao) UserGetRedbao(red_id, uid, shop_id, rand_arr, Amoney string) error {
        fmt.Println("收到 接收到红包数据 http请求")
        mtaskRequest := MtaskRequest{67}
        work := Job{MtaskRequest: mtaskRequest}
     
        JobQueue <- work
        return nil
    }

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持站长博客。如有错误或未考虑完全的地方,望不吝赐教。

    js
    下一篇:没有了