上一篇博客地址:python代理池的构建4——mongdb数据库的增删改查
一、对数据库里面代理ip检查(proxy_test.py)
#-*-coding:utf-8-*-
'''
目的:检查代理IP可用性,保证代理池中代理IP基本可用
思路
1.在proxy. _test.py中, 创建ProxyTester类
2.提供-一个run 方法,用于处理检测代理IP核心逻辑
2.1.从数据库中获取所有代理IP
2.2.遍历代理IP列表
2.3.检查代理可用性
如果代理不可用,让代理分数-1,如果代理分数等于0就从数据库中删除该代理,否则更新该代理IP
如果代理可用,就恢复该代理的分数,更新到数据库中
3.为了提高检查的速度,使用异步来执行检测任务
3.1把要检测的代理IP,放到队列中
3.2把检查一个代理可用性的代码,抽取到一一个方法中;从队列中获取代理IP,进行检查;检查完毕,
调度队列的task_done方法
3.3通过异步回调,使用死循环不断执行这个方法,
3.4开启多个一个异步任务,来处理代理IP的检测;可以通过配置文件指定异步数量
4.使用schedule 模块,每隔一定的时间,执行一-次检测任务
4.1定义类方法start ,用于启动检测模块
4.2在start方法中
创建本类对象
调用run方法
每间隔一定时间,执行一下run方法
'''
import sys
import time
import schedule
from queue import Queue
from gevent import monkey
monkey.patch_all() #打上猴子补丁,python代理池的构建3——爬取代理ip 里面有对应链接
from gevent.pool import Pool #导入代理池
sys.path.append("..")
from settings import MAX_SCORE,TEST_PROXIES_ASYNC_COUNT,TEST_PROXIES_INTERVAL
from proxy_validate.httpbin_validator import check_proxy
from db.mongo_pool import MongoPool
class ProxyTest(object):
def \_\_init\_\_(self):
self.mongo\_pool = MongoPool()
self.coroutine\_pool = Pool()
self.queue=Queue() #定义一个队列,用来放置mongdb数据库里面代理ip
def \_\_check\_callback(self,temp):
#这个的意思就是一直等到队列里面没有代理ip了才停止执行self.\_\_check\_one这个函数
self.coroutine\_pool.apply\_async(self.\_\_check\_one,callback=self.\_\_check\_callback)
def run(self):
proxies = self.mongo\_pool.find\_all()
for proxy in proxies:
#print(proxy.\_\_dict\_\_)
self.queue.put(proxy)
#self.\_\_check\_one(proxy)
for i in range(TEST\_PROXIES\_ASYNC\_COUNT):
#这个TEST\_PROXIES\_ASYNC\_COUNT是一个变量,这个for就是来控制去检查ip是否可用最多开TEST\_PROXIES\_ASYNC\_COUNT
#个数量的代理池,因为代理ip数据库中可能有好多,这样的话你给每一个代理ip都开一个协程。也会给系统带来很大负担
self.coroutine\_pool.apply\_async(self.\_\_check\_one,callback=self.\_\_check\_callback)
self.queue.join() #注意,这个是让先执行完的等一下没执行完得
def \_\_check\_one(self):
proxy = self.queue.get()
print(proxy.\_\_dict\_\_)
proxy = check\_proxy(proxy)
if proxy.speed == -1:
proxy.score -= 1
if proxy.score == 0:
self.mongo\_pool.delete\_one(proxy)
else:
self.mongo\_pool.update\_one(proxy)
else:
proxy.score = MAX\_SCORE
self.mongo\_pool.update\_proxy(proxy)
self.queue.task\_done()
@classmethod #类方法
def start(cls):
db = ProxyTest()
db.run()
schedule.every(TEST\_PROXIES\_INTERVAL).hours.do(db.run)
while True:
schedule.run\_pending()
time.sleep(60)
if __name__ == '__main__':
ProxyTest.start()
二、一些问题的解决
给一个python3队列讲解的链接(感觉还可以):https://www.cnblogs.com/dbf-/p/11118628.html
join
会在队列存在未完成任务时阻塞,等待队列无未完成任务,需要配合 task_done
使用
执行一次 put
会让未完成任务 +1 ,但是执行 get
并不会让未完成任务 -1 ,需要使用 task_done
让未完成任务 -1 ,否则 join
就无法判断
Queue.get(block=True, timeout=None)
get_nowait() = get(block=False)
当队列空了之后,get 就会阻塞,一直等待队列中有数据后再获取数据
Queue.put(block=True, timeout=None)
block
用于设置是否阻塞, timeout
用于设置阻塞时等待时长
put_nowait() = put(block=False)
当队列满了之后,put 就会阻塞,一直等待队列不再满时向里面添加数据
三、python代理池的构建的其他链接
python代理池的构建1——代理IP类的构建,以及配置文件、日志文件、requests请求头
四、关于代码一些问题解决链接:
python中schedule模块的简单使用 || importlib.import_module动态导入模块
Python中“*”和“**”的用法 || yield的用法 || ‘$in’和'$nin' || python @property的含义
手机扫一扫
移动阅读更方便
你可能感兴趣的文章