该方案基于任务调度框架Gearman,采用Python开发的分布式数据统计系统。
项目的目录结构很简单:
# apple at localhost in ~/Develop/getui [11:24:26]
$ tree
.
├── Browser.py
├── PickleGearman.py
├── SpiderWorker.py
└── countPushNum.py
0 directories, 4 files
我们的Mac Pro Book,Gearman安装并启动:
# apple at liujingyu.local in ~/Develop/getui [::]
$ brew install gearman
3 $ gearmand -d -L 127.0.0.1 -p 4307
Python需要安装Gearman、mechanize等库,(pip用于安装常用的包,具体安装见, https://pip.pypa.io/en/latest/installing.html#install-pip)
# apple at liujingyu.local in ~/Develop/getui [::]
$ pip install gearman mechanize
workder之间发送,接受Python对象。
$ cat PickleGearman.py
#!/usr/bin/env python
#coding:utf-8
import pickle
import gearman
class PickleDataEncoder(gearman.DataEncoder):
@classmethod
def encode(cls, encodable_object):
return pickle.dumps(encodable_object)
@classmethod
def decode(cls, decodable\_string):
return pickle.loads(decodable\_string)
class PickleWorker(gearman.GearmanWorker):
data_encoder = PickleDataEncoder
class PickleClient(gearman.GearmanClient):
data_encoder = PickleDataEncoder
运行图:
8个Spider运行过程图:
Spider代码:
$ cat SpiderWorker.py
#!/usr/bin/env python
from PickleGearman import PickleWorker
from Browser import Browser
class GearmanWorker(PickleWorker):
def on_job_execute(self, current_job):
return super(GearmanWorker, self).on_job_execute(current_job)
def SpiderWorker(gearman_worker, gearman_job):
taskIds = gearman_job.data
try:
doc = Browser(taskIds)
except Exception as e:
config.logging.info(e)
return doc
worker = GearmanWorker(['127.0.0.1:4307'])
worker.register_task("SpiderWorker", SpiderWorker)
worker.work()
countPushNum.py代码:
# apple at localhost in ~/Develop/getui [11:30:38]
$ cat countPushNum.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import cookielib
import json
import socket
socket.setdefaulttimeout(10)
import redis
import mechanize
from PickleGearman import PickleClient
import numpy as np
currency = 30
def printEveryGroupMsg(groupSum):
"""docstring for printEveryGroupMsg"""
print '有效可发送数 实际下发数 收到数'
print groupSum
def main():
gearman_clients = PickleClient(['127.0.0.1:4307'])
"""docstring for main"""
r1 = redis.Redis(host='xxx.xx.xx.x', port=6379, db=0, password='pasword')
r2 = redis.Redis(host='xx.xx.xx.xx', port=6379, db=0, password='pasword')
#总数统计
yesterdaykeys = '\*'+yesterday+':count'
totalkeys = r1.keys(yesterdaykeys)
for key in totalkeys:
print key,r1.get(key)
totalkeys = r2.keys(yesterdaykeys)
for key in totalkeys:
print key,r2.get(key)
#push数统计
yesterdaykeys = '\*'+yesterday+':taskIds'
totalkeys = r1.keys(yesterdaykeys)
for key in totalkeys:
print key
taskIds = list(r1.smembers(key))
everyGroup = \[\]
jobs = \[dict(task='SpiderWorker', data=taskId) for taskId in \[taskIds\[i:i+currency\] for i in range(0, len(taskIds), currency)\]\]
for per\_jobs in \[jobs\[i:i+currency\] for i in range(0, len(jobs), currency)\]:
completed\_requests = gearman\_clients.submit\_multiple\_jobs(per\_jobs)
for current\_request in completed\_requests:
content = current\_request.result
if len(content) == 3:
everyGroup.append(content)
printEveryGroupMsg(np.sum(everyGroup, 0))
totalkeys = r2.keys(yesterdaykeys)
for key in totalkeys:
print key
taskIds = list(r2.smembers(key))
everyGroup = \[\]
jobs = \[dict(task='SpiderWorker', data=taskId) for taskId in \[taskIds\[i:i+currency\] for i in range(0, len(taskIds), currency)\]\]
for per\_jobs in \[jobs\[i:i+currency\] for i in range(0, len(jobs), currency)\]:
completed\_requests = gearman\_clients.submit\_multiple\_jobs(per\_jobs)
for current\_request in completed\_requests:
content = current\_request.result
if len(content) == 3:
everyGroup.append(content)
printEveryGroupMsg(np.sum(everyGroup, 0))
if __name__ == '__main__':
from datetime import date, timedelta
day = input('请输入时间<昨天请输入1>\\n>') or 0
yesterday = (date.today() - timedelta(day)).strftime('%y%m%d')
today = (date.today() - timedelta(0)).strftime('%y%m%d')
main()
抓取模块代码:
$ cat Browser.py
#!/usr/bin/env python
#coding:utf-8
import mechanize
import numpy as np
import cookielib,json
def Browser(taskIds):
url = 'http://dev.igetui.com/login.htm'
# Browser
br = mechanize.Browser()
# Cookie Jar
cj = cookielib.LWPCookieJar()
br.set_cookiejar(cj)
# Browser options
br.set_handle_equiv(True)
br.set_handle_gzip(True)
br.set_handle_redirect(True)
br.set_handle_referer(True)
br.set_handle_robots(False)
# Follows refresh 0 but not hangs on refresh > 0
br.set_handle_refresh(mechanize._http.HTTPRefreshProcessor(), max_time=1)
# Want debugging messages?
br.set_debug_http(False)
br.set_debug_redirects(False)
br.set_debug_responses(False)
# User-Agent (this is cheating, ok?)
br.addheaders = [('User-agent', 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.1) \
Gecko/2008071615 Fedora/3.0.1-1.fc9 Firefox/3.0.1')]
# Open some site, let's pick a random one, the first that pops in mind:
r = br.open(url)
br.select\_form(name = 'loginForm')
# 登陆用户名和密码
br['username'] = 'getui'
br['password'] = 'password'
br.submit()
everyGroup = \[\]
for taskId in taskIds:
try:
tsum = \[\]
try:
home\_url = 'http://dev.getui.com/dos/statistics/apiStatistics'
response = br.open('https://dev.getui.com/dos/pushRecords/queryApiPushList?curPage=1&appId=16500&taskId=%s' % taskId)
html = response.read()
result = json.loads(html.strip())
if result.has\_key('resultList'):
resultList = result\['resultList'\]
tsum.append(int(resultList\[0\]\['sendNum'\]))
tsum.append(int(resultList\[0\]\['realSendNum'\]))
tsum.append(int(resultList\[0\]\['receiveNum'\]))
except Exception as e:
print e
else:
print tsum
if len(tsum) == 3:
everyGroup.append(tsum)
except Exception as e:
print e
return np.sum(everyGroup, 0)
手机扫一扫
移动阅读更方便
你可能感兴趣的文章