鍍金池/ 問答/Java  Linux/ 如何手動(dòng)擼一個(gè)能夠滿足下列并發(fā)的隊(duì)列?

如何手動(dòng)擼一個(gè)能夠滿足下列并發(fā)的隊(duì)列?

在做puv統(tǒng)計(jì)時(shí)碰到的一個(gè)問題,用戶請(qǐng)求過(guò)來(lái)會(huì)記錄為一個(gè)pv,記錄到redis中,但由于pv量太大會(huì)給redis造成過(guò)大壓力,所以做個(gè)緩存,當(dāng)pv滿10條了發(fā)一次。用了一個(gè)隊(duì)列ArrayList實(shí)現(xiàn),但隊(duì)列的插入、刪除在并發(fā)條件下不可行,所以在方法上加了synchronized:

static ArrayList<Object> pvList = new ArrayList<Object>();
public synchronized void countPv(...){
    //........生成一個(gè)PV對(duì)象
    pvList.add(PV)
    if(pvList.size()>10){
        //前10個(gè)加到redis
        addRedis(pvList.subList(0,10))
    }
    //刪除10個(gè)
    pvList.subList(0,10).clear();
}

但在壓力測(cè)試中,如果已滿負(fù)荷的連續(xù)壓測(cè),發(fā)現(xiàn)會(huì)丟掉一些pv,可能是synchronized造成的堵塞導(dǎo)致,如何更好的實(shí)現(xiàn)這個(gè)需求呢?

每10個(gè)請(qǐng)求發(fā)一次,而后刪除,同時(shí)可以滿足不斷累加

  1. synchronized放到函數(shù)里面估計(jì)提升并不大,畢竟add,size,clear方法都必須滿足同步需求
  2. ConcurrnetLinkedQueue, Concurrent...Array刪除、查找的開銷都非常大,而且貌似沒法用于這種場(chǎng)合

p.s. 您能留段偽碼就最好了

回答
編輯回答
薄荷綠

@fengdui 有一句評(píng)論說(shuō)得好,Redis操作要搬出去。

方法一:

// 把10個(gè)元素return出去讓外面的調(diào)用者去調(diào)Redis,別占用同步塊的時(shí)間
public synchronized List<Object> countPv(...) {
  ...
}

方法二:開個(gè)線程池去異步發(fā)Redis,但是機(jī)器重啟會(huì)丟失來(lái)不及發(fā)送的數(shù)據(jù)

if (pvList.size() > 10) {
        //前10個(gè)打包成任務(wù)扔給線程池
        senderExecutor.execute(new SendTask(new ArrayList<>(pvList.subList(0,10)))); //當(dāng)場(chǎng)復(fù)制了subList
        pvList.subList(0,10).clear(); //這行要移到這里,這可能就是你丟數(shù)據(jù)的原因
}
2018年9月14日 20:31
編輯回答
膽怯
2017年1月30日 02:40
編輯回答
深記你

java線程池,怎么樣

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
    final int index = i;
    singleThreadExecutor.execute(new Runnable() {
 
        @Override
        public void run() {
            try {
                // TODO 干你想干的
                addRedis(pvList.subList(0,10))
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
}

Java(Android)線程池

2018年6月20日 23:12
編輯回答
臭榴蓮

pipeline ?

2017年2月24日 16:51
編輯回答
乖乖噠

不知道你說(shuō)的滿負(fù)荷壓測(cè)時(shí)丟pv指的是什么,有沒有記錄這種情況下,服務(wù)器返回的狀態(tài)碼是什么?是因?yàn)槌瑫r(shí)么?
如果是因?yàn)閴毫^(guò)大超出服務(wù)器載能力,換用或其他數(shù)據(jù)結(jié)構(gòu)也不見得好到哪去, 如果想解鎖synchronized, 可能試試多例模式. 每個(gè)服務(wù)線程創(chuàng)建自己私有的緩存.

2017年5月23日 15:12
編輯回答
拮據(jù)

不要用synchronized
ConcurrnetLinkedQueue 用法不對(duì) subList(0,10)??? 開個(gè)線程從隊(duì)尾取 取了10個(gè)就 處理
你這10個(gè)扔redis搞什么意思 (還不如直接扔redis)
可以扔消息隊(duì)列里面 然后異步的處理

貼一段代碼
https://www.cnblogs.com/linji...
運(yùn)行結(jié)果:
costtime 2360ms

改用while (queue.size()>0)后
運(yùn)行結(jié)果:
cost time 46422ms

結(jié)果居然相差那么大,看了下ConcurrentLinkedQueue的API原來(lái).size()是要遍歷一遍集合的,難怪那么慢,所以盡量要避免用size而改用isEmpty().

總結(jié)了下, 在單位缺乏性能測(cè)試下,對(duì)自己的編程要求更加要嚴(yán)格,特別是在生產(chǎn)環(huán)境下更是要小心謹(jǐn)慎。

2017年6月30日 12:28
編輯回答
笑忘初
public synchronized void countPv(...){
    //........生成一個(gè)PV對(duì)象
   
    pvList.add(PV)
    if(pvList.size() > 10){
        addRedis(pvList.subList(0,10))
    }

    pvList.subList(0,10).clear();
}

pvList.subList(0,10).clear(); 這句代碼不應(yīng)該寫到 if 里面嗎?
既然 pvList 是 static 的(類變量),那么 countPV 也應(yīng)該是 static 的才對(duì),這樣 synchronize 使用的鎖才會(huì)是 class,而不是對(duì)象。

2018年5月12日 20:27