个推push数据统计(爬虫)
阅读原文时间:2023年07月13日阅读:3

该方案基于任务调度框架Gearman,采用Python开发的分布式数据统计系统。

项目的目录结构很简单:

# apple at localhost in ~/Develop/getui [11:24:26]
$ tree
.
├── Browser.py
├── PickleGearman.py
├── SpiderWorker.py
└── countPushNum.py

0 directories, 4 files

我们的Mac Pro Book,Gearman安装并启动:

# apple at liujingyu.local in ~/Develop/getui [::]
$ brew install gearman
3 $ gearmand -d -L 127.0.0.1 -p 4307

Python需要安装Gearman、mechanize等库,(pip用于安装常用的包,具体安装见, https://pip.pypa.io/en/latest/installing.html#install-pip)

# apple at liujingyu.local in ~/Develop/getui [::]
$ pip install gearman mechanize

workder之间发送,接受Python对象。

$ cat PickleGearman.py
#!/usr/bin/env python
#coding:utf-8

import pickle
import gearman

class PickleDataEncoder(gearman.DataEncoder):
@classmethod
def encode(cls, encodable_object):
return pickle.dumps(encodable_object)

 @classmethod  
 def decode(cls, decodable\_string):  
     return pickle.loads(decodable\_string)

class PickleWorker(gearman.GearmanWorker):
data_encoder = PickleDataEncoder

class PickleClient(gearman.GearmanClient):
data_encoder = PickleDataEncoder

运行图:

8个Spider运行过程图:

Spider代码:

$ cat SpiderWorker.py
#!/usr/bin/env python

from PickleGearman import PickleWorker
from Browser import Browser

class GearmanWorker(PickleWorker):
def on_job_execute(self, current_job):
return super(GearmanWorker, self).on_job_execute(current_job)

def SpiderWorker(gearman_worker, gearman_job):
taskIds = gearman_job.data

 try:  
     doc = Browser(taskIds)  
 except Exception as e:  
     config.logging.info(e)

 return doc

worker = GearmanWorker(['127.0.0.1:4307'])
worker.register_task("SpiderWorker", SpiderWorker)
worker.work()

countPushNum.py代码:

# apple at localhost in ~/Develop/getui [11:30:38]
$ cat countPushNum.py
#!/usr/bin/python
# -*- coding: utf-8 -*-

import cookielib
import json
import socket
socket.setdefaulttimeout(10)
import redis
import mechanize
from PickleGearman import PickleClient
import numpy as np
currency = 30

def printEveryGroupMsg(groupSum):
"""docstring for printEveryGroupMsg"""
print '有效可发送数 实际下发数 收到数'
print groupSum

def main():
gearman_clients = PickleClient(['127.0.0.1:4307'])
"""docstring for main"""
r1 = redis.Redis(host='xxx.xx.xx.x', port=6379, db=0, password='pasword')
r2 = redis.Redis(host='xx.xx.xx.xx', port=6379, db=0, password='pasword')

 #总数统计  
 yesterdaykeys = '\*'+yesterday+':count'

 totalkeys = r1.keys(yesterdaykeys)  
 for key in totalkeys:  
     print key,r1.get(key)  
 totalkeys = r2.keys(yesterdaykeys)  
 for key in totalkeys:  
     print key,r2.get(key)

 #push数统计  
 yesterdaykeys = '\*'+yesterday+':taskIds'

 totalkeys = r1.keys(yesterdaykeys)  
 for key in totalkeys:  
     print key  
     taskIds = list(r1.smembers(key))  
     everyGroup = \[\]  
     jobs = \[dict(task='SpiderWorker', data=taskId) for taskId in \[taskIds\[i:i+currency\] for i in range(0, len(taskIds), currency)\]\]  
     for per\_jobs in \[jobs\[i:i+currency\] for i in range(0, len(jobs), currency)\]:  
         completed\_requests = gearman\_clients.submit\_multiple\_jobs(per\_jobs)  
         for current\_request in completed\_requests:  
             content = current\_request.result  
             if len(content) == 3:  
                 everyGroup.append(content)  
     printEveryGroupMsg(np.sum(everyGroup, 0))

 totalkeys = r2.keys(yesterdaykeys)  
 for key in totalkeys:  
     print key  
     taskIds = list(r2.smembers(key))

     everyGroup = \[\]  
     jobs = \[dict(task='SpiderWorker', data=taskId) for taskId in \[taskIds\[i:i+currency\] for i in range(0, len(taskIds), currency)\]\]  
     for per\_jobs in \[jobs\[i:i+currency\] for i in range(0, len(jobs), currency)\]:  
         completed\_requests = gearman\_clients.submit\_multiple\_jobs(per\_jobs)  
         for current\_request in completed\_requests:  
             content = current\_request.result  
             if len(content) == 3:  
                 everyGroup.append(content)  
     printEveryGroupMsg(np.sum(everyGroup, 0))

if __name__ == '__main__':

 from datetime import date, timedelta

 day = input('请输入时间<昨天请输入1>\\n>') or 0

 yesterday = (date.today() - timedelta(day)).strftime('%y%m%d')  
 today = (date.today() - timedelta(0)).strftime('%y%m%d')

 main()

抓取模块代码:

$ cat Browser.py
#!/usr/bin/env python
#coding:utf-8

import mechanize
import numpy as np
import cookielib,json

def Browser(taskIds):
url = 'http://dev.igetui.com/login.htm'
# Browser
br = mechanize.Browser()

# Cookie Jar
cj = cookielib.LWPCookieJar()
br.set_cookiejar(cj)

# Browser options
br.set_handle_equiv(True)
br.set_handle_gzip(True)
br.set_handle_redirect(True)
br.set_handle_referer(True)
br.set_handle_robots(False)

# Follows refresh 0 but not hangs on refresh > 0
br.set_handle_refresh(mechanize._http.HTTPRefreshProcessor(), max_time=1)

# Want debugging messages?
br.set_debug_http(False)
br.set_debug_redirects(False)
br.set_debug_responses(False)

# User-Agent (this is cheating, ok?)
br.addheaders = [('User-agent', 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.1) \
Gecko/2008071615 Fedora/3.0.1-1.fc9 Firefox/3.0.1')]

# Open some site, let's pick a random one, the first that pops in mind:
r = br.open(url)

 br.select\_form(name = 'loginForm')  

# 登陆用户名和密码
br['username'] = 'getui'
br['password'] = 'password'
br.submit()

 everyGroup = \[\]  
 for taskId in taskIds:  
     try:  
         tsum = \[\]  
         try:  
             home\_url = 'http://dev.getui.com/dos/statistics/apiStatistics'  
             response = br.open('https://dev.getui.com/dos/pushRecords/queryApiPushList?curPage=1&appId=16500&taskId=%s' % taskId)  
             html = response.read()

             result = json.loads(html.strip())  
             if result.has\_key('resultList'):  
                 resultList = result\['resultList'\]

                 tsum.append(int(resultList\[0\]\['sendNum'\]))  
                 tsum.append(int(resultList\[0\]\['realSendNum'\]))  
                 tsum.append(int(resultList\[0\]\['receiveNum'\]))  
         except Exception as e:  
             print e  
         else:  
             print tsum

         if len(tsum) == 3:  
             everyGroup.append(tsum)  
     except Exception as e:  
         print e

 return np.sum(everyGroup, 0)