系统学习数据库day12
图书数据存入数据库from pymysql import connect, Errorfrom pymysql.cursors import DictCursorfrom lxml import htmlimport requestsimport timeimport pandas as pdheaders = {"user-agent": "Mozilla/5.0 (Windows NT 10.
·
图书数据存入数据库
from pymysql import connect, Error
from pymysql.cursors import DictCursor
from lxml import html
import requests
import time
import pandas as pd
headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36"}
def save_data(data_list):
"""
:param data_list: 数据列表
:return:
"""
obj = PythonMySQLTest()
count = 0
for data in data_list:
book_name = data["book_name"]
link = data["link"]
price = data["price"]
store = data["store"]
obj.add_one_data(book_name, link, price, store)
time.sleep(0.5)
count +=1
print("成功保存数据{}条".format(count))
# 关闭连接
obj.close_connection()
class PythonMySQLTest(object):
def __init__(self):
# 数据库的连接对象
self.conn = self.get_connection()
def get_connection(self):
""" 获取数据库的连接对象 """
try:
conn = connect(
host='127.0.0.1',
port=3306,
user='root',
password='root',
database='studydatabase1',
cursorclass=DictCursor
)
return conn
except Error as e:
print('连接失败:{}'.format(e))
return None
def close_connection(self):
""" 关闭数据库连接 """
try:
if self.conn is not None:
self.conn.close()
except Exception as e:
print('数据库关闭失败:{}'.format(e))
def get_student_info_by_no(self, stu_no):
pass
def delete_student_by_no(self, stu_no):
pass
def get_students_by_page(self, page=1, page_size=10):
pass
def add_one_data(self, book_name, link, price, store):
sql = 'INSERT INTO `book_info` VALUES (%s, %s, %s, %s, %s);'
with self.conn.cursor() as cursor:
cursor.execute(sql, (None, book_name, link, price, store))
self.conn.commit()
return cursor.rowcount
def spider(isbn, book_list=[]):
url = "http://search.dangdang.com/?key={}&act=input".format(isbn)
print(url)
# 获取网页源代码
response = requests.get(url, headers=headers)
if response.status_code == 200:
html_data = response.text
print(type(html_data))
selector = html.fromstring(html_data)
print(type(selector))
ul_list = selector.xpath('//div[@id="search_nature_rg"]/ul/li')
for li in ul_list:
book_name = li.xpath('p[@class="name"]/a/text()')[0]
# print(book_name)
link = li.xpath('p[@class="name"]/a/@href')[0]
# print(link)
price = li.xpath('p[@class="price"]/span[@class="search_now_price"]/text()')
price = "¥9999" if len(price) == 0 else price[0]
price = price.replace("¥", "")
price = float(price)
# print(price)
store = li.xpath('p[@class="search_shangjia"]/a/text()')
store = "当当自营" if len(store) == 0 else store[0]
# print(store)
# [ ["python", "link", 22.3, "自营"], [], [], [] ]
# [ {}, {}, {} ]
book_list.append({
"book_name":book_name,
"link":link,
"price":price,
"store":store
})
# 排序
book_list.sort(key=lambda x:x["price"])
for book in book_list:
print(book)
save_data(book_list)
# df = pd.DataFrame(book_list)
# df.to_excel("当当图书.xlsx", index=False)
# xpath提取
spider("9787115546081")
保存单张图片
import requests
url = "https://img2.baidu.com/it/u=1441836571,2166773131&fm=26&fmt=auto&gp=0.jpg"
response = requests.get(url)
print(response.status_code)
# 获取bytes类型的响应
img_data = response.content
# wb 写二进制
with open('111.png', mode='wb') as f:
f.write(img_data)
豆瓣即将上映爬虫完整版
from lxml import html
import requests
import pandas as pd
from pymysql import connect, Error
from pymysql.cursors import DictCursor
import time
def spider(location, movie_list=[]):
url = 'https://movie.douban.com/cinema/later/{}/'.format(location)
# print(url)
# 1.获取网页源代码(html_data)
headers = {'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36'}
response = requests.get(url, headers=headers)
if response.status_code == 200:
html_data = response.text
# print(html_data)
with open(file='豆瓣即将上映电影.html', mode='w', encoding='utf-8') as f:
f.write(html_data)
# 2.xpath提取
selector = html.fromstring(html_data)
div_list = selector.xpath('//div[@id="showing-soon"]/div')
for div in div_list:
# 电影名
movie_name = div.xpath('div/h3/a/text()')[0]
# print(movie_name)
# 上映日期
movie_date = div.xpath('div/ul/li[1]/text()')
movie_date = movie_date[0].replace('月', '')
movie_date = movie_date.replace('日', '')
movie_date = int(movie_date)
# print(movie_date)
# 电影类型
movie_type = div.xpath('div/ul/li[2]/text()')[0]
# 上映地点
movie_place = div.xpath('div/ul/li[3]/text()')[0]
# 想看人数
num = div.xpath('div/ul/li[4]/span/text()')
num = num[0].replace('人想看', '')
num = int(num)
# 电影封面
movie_cover = div.xpath('a/img/@src')[0]
resp1 = requests.get(movie_cover)
if resp1.status_code == 200:
with open('./imgs/{}.jpg'.format(movie_name), mode='wb') as f:
f.write(resp1.content)
print('保存{}.jpg成功!'.format(movie_name))
# 生成列表
movie_list.append({
'movie_name': movie_name,
'movie_date': movie_date,
'movie_type': movie_type,
'movie_place': movie_place,
'num': num,
'movie_cover': movie_cover
})
# 排序
movie_list.sort(key=lambda x: x['movie_date'])
for movie in movie_list:
print(movie)
# df = pd.DataFrame(movie_list)
# # print(df.head())
# df.to_excel('豆瓣电影.xlsx', index=False)
# save_data(movie_list)
if __name__ == '__main__':
a = spider('shenyang')
def save_data(data_list):
obj = PyMySQLLearn()
count = 0
for data in data_list:
movie_name = data['movie_name']
movie_date = data['movie_date']
movie_type = data['movie_type']
movie_place = data['movie_place']
num = data['num']
movie_cover = data['movie_cover']
obj.add_one_data(movie_name, movie_date, movie_type, movie_place, num, movie_cover)
time.sleep(0.5)
count += 1
print('成功保存数据{}条'.format(count))
obj.close_connection()
class PyMySQLLearn(object):
def __init__(self):
self.__conn = self.get_connection()
def get_connection(self):
try:
conn = connect(host='localhost',
port=3306,
user='root',
password='root',
database='studydatabase1',
charset='utf8',
cursorclass=DictCursor)
return conn
except Error as e:
print('连接失败{}'.format(e))
return None
def close_connection(self):
try:
if self.__conn is not None:
self.__conn.close()
except Exception as e:
print('数据库关闭失败 {}'.format(e))
# 查询一条数据
def get_one_data(self):
pass
# 删除一条数据
def delete_one_data(self, row_id):
pass
# 通过stu_no查询学生信息
def get_student_info_by_no(self, stu_no):
pass
# 通过分页查询学生信息
def get_student_by_page(self, page=1, page_size=10):
pass
# 添加一条数据
def add_one_data(self, movie_name, movie_date, movie_type, movie_place, num, movie_cover):
sql = 'INSERT INTO `movie_info` VALUES (%s, %s, %s, %s, %s, %s, %s);'
with self.__conn.cursor() as cursor:
row_count = cursor.execute(sql, (None, movie_name, movie_date, movie_type, movie_place, num, movie_cover))
self.__conn.commit()
return row_count
# 删除一个学生信息
def delete_student_by_no(self, stu_no):
sql = "delete from school_student_info where stu_no = %s"
with self.__conn.cursor() as cursor:
row_count = cursor.execute(sql, (stu_no,))
self.__conn.commit()
return row_count
更多推荐



所有评论(0)