socket搭建web服务端
阅读原文时间:2023年07月09日阅读:1

import socket
from threading import Thread
import time

def html(conn):
time_tag = str(time.time())
print(time_tag)
with open('test1.html', 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace('#zzxx#', time_tag)
content = content.encode('utf-8')
conn.send(content)
conn.close()
def css(conn):
with open('test.css', 'rb') as f:
content = f.read()
conn.send(content)
conn.close()
def js(conn):
with open('test.js', 'rb') as f:
content = f.read()
conn.send(content)
conn.close()
def jpg(conn):
with open('934905.jpg', 'rb') as f:
content = f.read()
conn.send(content)
conn.close()
def icon(conn):
with open('bilibili.ico', 'rb') as f:
content = f.read()
conn.send(content)
conn.close()
sk = socket.socket()
sk.bind(("127.0.0.1", 8080))
sk.listen()

urlpath = [
('/', html),
('/test.css', css),
('/test.js', js),
('/934905.jpg', jpg),
('/bilibili.ico', icon),
]

while True:
conn, addr = sk.accept()
msg = conn.recv(1024)
# print(msg.decode("utf-8"))
request_str = msg.decode("utf-8")
path = request_str.split('\r\n')[0].split(' ')[1]
print(path)
conn.send(b"HTTP/1.1 200 ok\r\n\r\n")
# conn.send(b"hello")
for i in urlpath:
if path == i[0]:
# i[1](conn)
t = Thread(target=i[1], args=(conn, ))
t.start()

wsgiref Web框架

from wsgiref.simple_server import make_server
import time
from showdata import showdata
from jinja2 import Template

def html():
# time_tag = str(time.time())
# print(time_tag)
user_info = showdata()
# with open('test1.html', 'r', encoding='utf-8') as f:
with open('jinja2test.html', 'r', encoding='utf-8') as f:
content = f.read()

tem = Template(content)  
content = tem.render({"userinfo": user\_info})

content = content.replace('#zzxx#', user\_info\['name'\])  
content = content.encode('utf-8')  
return content  

def css():
with open('test.css', 'rb') as f:
content = f.read()
return content
def js():
with open('test.js', 'rb') as f:
content = f.read()
return content
def jpg():
with open('934905.jpg', 'rb') as f:
content = f.read()
return content
def icon():
with open('bilibili.ico', 'rb') as f:
content = f.read()
return content
urlpath = [
('/', html),
('/test.css', css),
('/test.js', js),
('/934905.jpg', jpg),
('/bilibili.ico', icon),
]
def application(environ, start_response):
print(environ)
start_response('200 OK', [('k1', 'v1'), ('k2', 'v2')])

path = environ\['PATH\_INFO'\]  
for i in urlpath:  
    if path == i\[0\]:  
        ret = i\[1\]()  
        break  
else:  
    ret = b'404'

# return \[b'<h1>Hello World!<h1>'\]  
return \[ret\]

httpd = make_server('127.0.0.1', 8080, application)
print('Seeving HTTP on port 8080')
httpd.serve_forever()

showdata文件

import pymysql
def showdata():
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='web_test',
charset='utf8'
)

cursor = conn.cursor(pymysql.cursors.DictCursor)  
sql = 'select \* from userinfo;'  
cursor.execute(sql)  
data = cursor.fetchone()  
print(data)  
conn.commit()  
cursor.close()  
conn.close()  
return data

下面是将服务打包起来,更易于管理

manage.py

"""
写服务器逻辑
"""
from wsgiref.simple_server import make_server
from urls import urlpath

def application(environ, start_response):
# print(environ)
start_response('200 OK', [('k1', 'v1'), ('k2', 'v2')])

path = environ\['PATH\_INFO'\]  
print(path)  
for i in urlpath:  
    if path == i\[0\]:  
        ret = i\[1\]()  
        break  
else:  
    ret = b'404'

# return \[b'<h1>Hello World!<h1>'\]  
return \[ret\]

httpd = make_server('127.0.0.1', 8080, application)

print('Seeving HTTP on port 8080')

httpd.serve_forever()

views.py

"""
写业务逻辑
"""

from showdata import showdata
from jinja2 import Template

def html():
# time_tag = str(time.time())
# print(time_tag)
user_info = showdata()
# with open('test1.html', 'r', encoding='utf-8') as f:
with open('templates/jinja2test.html', 'r', encoding='utf-8') as f:
content = f.read()

tem = Template(content)  
content = tem.render({"userinfo": user\_info})  
print("主页")

content = content.replace('#zzxx#', user\_info\['name'\])  
content = content.encode('utf-8')  
return content  

def css():
with open('static/css/test.css', 'rb') as f:
content = f.read()
print("css文件")
return content
def js():
with open('static/js/test.js', 'rb') as f:
content = f.read()
return content
def jpg():
with open('static/img/934905.jpg', 'rb') as f:
content = f.read()
return content
def icon():
with open('static/img/bilibili.ico', 'rb') as f:
content = f.read()
return content

urls.py

"""
路由逻辑
"""
import views

urlpath = [
('/', views.html),
('/static/css/test.css', views.css),
('/static/js/test.js', views.js),
('/static/img/934905.jpg', views.jpg),
('/static/img/bilibili.ico', views.icon)
]

models.py

import pymysql

conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='web_test',
charset='utf8'
)

cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'create table userinfo(id int primary key auto_increment,name char(10), age int' \
' not null);'
cursor.execute(sql)

conn.commit()
cursor.close()
conn.close()