环境
spark-1.6
python3.5
一、wordcount
# -*- coding:utf-8 -*-
'''
Created on 2019年5月13日
@author: Administrator
'''
#从pyspark中导入相应的包
from pyspark import SparkConf
from pyspark import SparkContext
def show(x):
print(x)
if __name__ == '__main__':
#创建SparkConf
conf = SparkConf().setAppName("wordcount").setMaster("local")
#创建SparkContext 注意参数要传递conf=conf
sc = SparkContext(conf=conf)
#设置日志级别
sc.setLogLevel("WARN")
#使用2个分区读取数据 一行行的数据
lines = sc.textFile("../../data/words", 2)
print("lines rdd partition length = %d"%(lines.getNumPartitions()))
#每一行数据按照空格拆分 得到一个个单词
words = lines.flatMap(lambda line:line.split(" "), True)
#将每个单词 组装成一个tuple 计数1
pairWords = words.map(lambda word : (word,1),True)
#使用3个分区 reduceByKey进行汇总
result = pairWords.reduceByKey(lambda v1,v2:v1+v2, 3)
print("result rdd partition length = %d"%(result.getNumPartitions()))
#打印结果
result.foreach(lambda t :show(t))
#将结果保存到文件
result.saveAsTextFile("../../data/wc-result")
#关闭
sc.stop()
二、PVUV
# -*- coding:utf-8 -*-
'''
Created on 2019年5月16日
@author: Administrator
'''
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from builtins import sorted
#打印结果
def showresult(em):
print(em)
#数据样例
#7.213.213.208 吉林 2018-03-29 1522294977303 1920936170939152672 www.dangdang.com Login
#页面访问量
def pv(lines):
sitepair = lines.map(lambda line:(line.split("\t")[5],1))
result1 = sitepair.reduceByKey(lambda v1,v2:v1+v2)
#排序 降序
result2 = result1.sortBy(lambda one:one[1],ascending=False)
result2.foreach(lambda em :showresult(em))
#用户访问量
def uv(lines):
#同一个IP访问某个网站量要排重
sitepair = lines.map(lambda line:line.split("\t")[0]+"_"+line.split("\t")[5]).distinct()
result = sitepair.map(lambda one:(one.split("_")[1],1)).reduceByKey(lambda v1,v2:v1+v2).sortBy(lambda one:one[1],ascending=False)
result.foreach(lambda one:showresult(one))
def uvExceptBJ(lines):
usiteviews = lines.filter(lambda line:line.split("\t")[1] != "北京").map(lambda line:line.split("\t")[0]+"_"+line.split("\t")[5]).distinct()
result1 = usiteviews.map(lambda one:(one.split("_")[1],1)).reduceByKey(lambda v1,v2:v1+v2)
result2 = result1.sortBy(lambda one:one[1],ascending=False)
result2.foreach(lambda em : showresult(em))
def getTop2Location(lines):
#按照网站分组
site_locations = lines.map(lambda line:(line.split("\t")[5],line.split("\t")[1])).groupByKey()
result = site_locations.map(lambda one:getCurrSiteTop2Location(one)).collect()
for em in result:
print(em)
def getCurrSiteTop2Location(one):
site = one[0]
locations = one[1]
locationdict = {}
#汇总每个网站中location的数量
for location in locations:
if location in locationdict:
locationdict\[location\] += 1
else:
locationdict\[location\] = 1
resultlist = \[\]
#使用内置函数排序
sortedList = sorted(locationdict.items(),key = lambda kv:kv\[1\],reverse = True)
#取前两个地区
if len(sortedList) < 2:
resultlist = sortedList
else:
for i in range(2):
resultlist.append(sortedList\[i\])
return site,resultlist
def getTopOperation(lines):
site_operations = lines.map(lambda line:(line.split("\t")[5],line.split("\t")[6])).groupByKey()
result = site_operations.map(lambda one:getCurrSiteTopOperation(one)).collect()
for em in result:
print(em)
def getCurrSiteTopOperation(one):
site = one[0]
operations = one[1]
operationDict = {}
for operation in operations:
if operation in operationDict:
operationDict[operation] += 1
else:
operationDict[operation] = 1
resultList=\[\]
sortedList = sorted(operationDict.items(), key=lambda kv:kv\[1\], reverse=True)
if len(sortedList) < 1:
resultList=\[\]
else:
resultList.append(sortedList\[0\])
return site,resultList
def getTop3User(lines):
#另外一种思路 按照用户分组 统计每个用户访问不同网站数量
site_uid_count = lines.map(lambda line:(line.split("\t")[3],line.split("\t")[5])).groupByKey().flatMap(lambda one:getSiteInfo(one))
#按照网站分组之后再取前三
result = site_uid_count.groupByKey().map(lambda one:getCurSiteTop3User(one)).collect()
for em in result:
print(em)
#统计每个用户访问网站数量 然后返回每个网站对应用户访问量
def getSiteInfo(one):
uid = one[0]
sites = one[1]
siteDict = {}
for site in sites:
if site in siteDict:
siteDict[site] += 1
else:
siteDict[site] = 1
resultList=[]
for site,count in siteDict.items():
resultList.append((site,(uid,count)))
return resultList
def getCurSiteTop3User(one):
site = one[0]
uid_counts = one[1]
top3List = ["","",""]
for uid_count in uid_counts:
for i in range(0,len(top3List)):
if top3List[i] == "":
top3List[i] = uid_count
break
else:
if uid_count[1] > top3List[i][1]:
for j in range(2,i,-1):
top3List[j] = top3List[j-1]
top3List[i] = uid_count
break
return site,top3List
if __name__ == '__main__':
conf = SparkConf().setMaster("local").setAppName("pvuv")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
lines = sc.textFile('../../data/pvuvdata')
# 1).统计PV,UV
pv(lines)
uv(lines)
# 2).统计除了北京地区外的UV
uvExceptBJ(lines)
# 3).统计每个网站最活跃的top2地区
getTop2Location(lines)
# 4).统计每个网站最热门的操作
getTopOperation(lines)
# 5).统计每个网站下最活跃的top3用户
getTop3User(lines)
#停止
sc.stop()
手机扫一扫
移动阅读更方便
你可能感兴趣的文章