python之路 之一pyspark
阅读原文时间:2023年07月11日阅读:2

pip包下载安装pyspark

pip install pyspark  这里可能会遇到安装超时的情况   加参数  --timeout=100

pip   -default   -timeout=100     install -U pyspark

下面是我写的一些代码,在运行时,没什么问题,但是目前不知道怎么拿到rdd与dataframe中的值

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext,Row,DataFrame
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

appname = "myappname"
master = "local"
myconf = SparkConf().setAppName(appname).setMaster(master)
sc = SparkContext(conf=myconf)
hc = HiveContext(sc)

构建一个表格 Parallelize a list and convert each line to a Row 将列表并行化并将每行转换为一行

# 构建表可以用applySchema 或者 inferSchema inferSchema已经在1.5之后弃用,由createDataFrame代替
datas = ["1 b 28", "3 c 30", "2 d 29"]
source = sc.parallelize(datas)

splits = source.map(lambda line: line.split(" ")) # 后面是注释
rows = splits.map(lambda words : Row(id=int(words[0]),name=words[1],age=int(words[2])))

myrows = Row(id="a",name="zhangkun",age="28")
#print(myrows.__getitem__(0))
#print(myrows.__getitem__(1))
#print(myrows.__getitem__(2))

Infer the schema,and register the schema as a table 推断架构,并将架构注册为表

fields=[]
fields.append(StructField("id", IntegerType(), True))
fields.append(StructField("name", StringType(), True))
fields.append(StructField("age", IntegerType(), True))
schema = StructType(fields)
people=hc.createDataFrame(myrows,schema); # 1.5之前使用的是inferSchema
# people.printSchema()
people.registerTempTable("people")
# SQL can be run over SchemaRDD that been registered as a table sql 可以在注册过的表上正常运行了
results=hc.sql("select * from people")

#print(results.show)
for i in results :
print(i)
sc.stop()

突然来个新任务,CDH部署大数据分布式平台 ,含以下组建安装:hadoop、hbase、hive、kafka、spark 暂时上面的线搁置,等回头用到在看,主要还是本人基础比较差,需要多学习一些基础。