图书数据存入数据库

from pymysql import connect, Error
from pymysql.cursors import DictCursor
from lxml import html
import requests
import time
import pandas as pd
headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36"}

def save_data(data_list):
    """
    :param data_list: 数据列表
    :return:
    """
    obj = PythonMySQLTest()
    count = 0
    for data in data_list:
        book_name = data["book_name"]
        link = data["link"]
        price = data["price"]
        store = data["store"]
        obj.add_one_data(book_name, link, price, store)
        time.sleep(0.5)
        count +=1
    print("成功保存数据{}条".format(count))
    # 关闭连接
    obj.close_connection()

class PythonMySQLTest(object):

    def __init__(self):
        # 数据库的连接对象
        self.conn = self.get_connection()

    def get_connection(self):
        """ 获取数据库的连接对象 """
        try:
            conn = connect(
                host='127.0.0.1',
                port=3306,
                user='root',
                password='root',
                database='studydatabase1',
                cursorclass=DictCursor
            )
            return conn
        except Error as e:
            print('连接失败:{}'.format(e))
            return None

    def close_connection(self):
        """ 关闭数据库连接 """
        try:
            if self.conn is not None:
                self.conn.close()
        except Exception as e:
            print('数据库关闭失败:{}'.format(e))

    def get_student_info_by_no(self, stu_no):

        pass

    def delete_student_by_no(self, stu_no):
        pass

    def get_students_by_page(self, page=1, page_size=10):
        pass

    def add_one_data(self, book_name, link, price, store):

        sql = 'INSERT INTO `book_info` VALUES (%s, %s, %s, %s, %s);'
        with self.conn.cursor() as cursor:
            cursor.execute(sql, (None, book_name, link, price, store))
        self.conn.commit()
        return cursor.rowcount

def spider(isbn, book_list=[]):
    url = "http://search.dangdang.com/?key={}&act=input".format(isbn)
    print(url)
    # 获取网页源代码
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        html_data = response.text
        print(type(html_data))
        selector = html.fromstring(html_data)
        print(type(selector))
        ul_list = selector.xpath('//div[@id="search_nature_rg"]/ul/li')
        for li in ul_list:
            book_name = li.xpath('p[@class="name"]/a/text()')[0]
            # print(book_name)
            link = li.xpath('p[@class="name"]/a/@href')[0]
            # print(link)
            price = li.xpath('p[@class="price"]/span[@class="search_now_price"]/text()')
            price = "¥9999" if len(price) == 0 else price[0]
            price = price.replace("¥", "")
            price = float(price)
            # print(price)
            store = li.xpath('p[@class="search_shangjia"]/a/text()')
            store = "当当自营" if len(store) == 0 else store[0]
            # print(store)
            # [ ["python", "link", 22.3, "自营"], [],  [],  [] ]
            # [ {}, {}, {} ]
            book_list.append({
                "book_name":book_name,
                "link":link,
                "price":price,
                "store":store
            })
        # 排序
        book_list.sort(key=lambda x:x["price"])
        for book in book_list:
            print(book)
        save_data(book_list)
        # df = pd.DataFrame(book_list)
        # df.to_excel("当当图书.xlsx", index=False)

    # xpath提取
spider("9787115546081")

保存单张图片

import requests
url = "https://img2.baidu.com/it/u=1441836571,2166773131&fm=26&fmt=auto&gp=0.jpg"
response = requests.get(url)
print(response.status_code)
# 获取bytes类型的响应
img_data = response.content
# wb 写二进制
with open('111.png', mode='wb') as f:
    f.write(img_data)

豆瓣即将上映爬虫完整版

from lxml import html
import requests
import pandas as pd
from pymysql import connect, Error
from pymysql.cursors import DictCursor
import time


def spider(location, movie_list=[]):
    url = 'https://movie.douban.com/cinema/later/{}/'.format(location)
    # print(url)
    # 1.获取网页源代码(html_data)
    headers = {'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36'}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        html_data = response.text
        # print(html_data)
        with open(file='豆瓣即将上映电影.html', mode='w', encoding='utf-8') as f:
            f.write(html_data)
        # 2.xpath提取
        selector = html.fromstring(html_data)
        div_list = selector.xpath('//div[@id="showing-soon"]/div')
        for div in div_list:
            # 电影名
            movie_name = div.xpath('div/h3/a/text()')[0]
            # print(movie_name)

            # 上映日期
            movie_date = div.xpath('div/ul/li[1]/text()')
            movie_date = movie_date[0].replace('月', '')
            movie_date = movie_date.replace('日', '')
            movie_date = int(movie_date)
            # print(movie_date)

            # 电影类型
            movie_type = div.xpath('div/ul/li[2]/text()')[0]

            # 上映地点
            movie_place = div.xpath('div/ul/li[3]/text()')[0]

            # 想看人数
            num = div.xpath('div/ul/li[4]/span/text()')
            num = num[0].replace('人想看', '')
            num = int(num)

            # 电影封面
            movie_cover = div.xpath('a/img/@src')[0]
            resp1 = requests.get(movie_cover)
            if resp1.status_code == 200:
                with open('./imgs/{}.jpg'.format(movie_name), mode='wb') as f:
                    f.write(resp1.content)
            print('保存{}.jpg成功!'.format(movie_name))


            # 生成列表
            movie_list.append({
                'movie_name': movie_name,
                'movie_date': movie_date,
                'movie_type': movie_type,
                'movie_place': movie_place,
                'num': num,
                'movie_cover': movie_cover
            })
        # 排序
        movie_list.sort(key=lambda x: x['movie_date'])
        for movie in movie_list:
            print(movie)

        # df = pd.DataFrame(movie_list)
        # # print(df.head())
        # df.to_excel('豆瓣电影.xlsx', index=False)

        # save_data(movie_list)





if __name__ == '__main__':
    a = spider('shenyang')


def save_data(data_list):
    obj = PyMySQLLearn()
    count = 0
    for data in data_list:
        movie_name = data['movie_name']
        movie_date = data['movie_date']
        movie_type = data['movie_type']
        movie_place = data['movie_place']
        num = data['num']
        movie_cover = data['movie_cover']

        obj.add_one_data(movie_name, movie_date, movie_type, movie_place, num, movie_cover)
        time.sleep(0.5)
        count += 1
    print('成功保存数据{}条'.format(count))

    obj.close_connection()

class PyMySQLLearn(object):
    def __init__(self):
        self.__conn = self.get_connection()

    def get_connection(self):
        try:
            conn = connect(host='localhost',
                           port=3306,
                           user='root',
                           password='root',
                           database='studydatabase1',
                           charset='utf8',
                           cursorclass=DictCursor)
            return conn

        except Error as e:
            print('连接失败{}'.format(e))
            return None

    def close_connection(self):
        try:
            if self.__conn is not None:
                self.__conn.close()
        except Exception as e:
            print('数据库关闭失败 {}'.format(e))

    # 查询一条数据
    def get_one_data(self):
        pass

    # 删除一条数据
    def delete_one_data(self, row_id):
        pass

    # 通过stu_no查询学生信息
    def get_student_info_by_no(self, stu_no):
        pass

    # 通过分页查询学生信息
    def get_student_by_page(self, page=1, page_size=10):
        pass

    # 添加一条数据
    def add_one_data(self, movie_name, movie_date, movie_type, movie_place, num, movie_cover):
        sql = 'INSERT INTO `movie_info` VALUES (%s, %s, %s, %s, %s, %s, %s);'
        with self.__conn.cursor() as cursor:
            row_count = cursor.execute(sql, (None, movie_name, movie_date, movie_type, movie_place, num, movie_cover))
            self.__conn.commit()
        return row_count

    # 删除一个学生信息
    def delete_student_by_no(self, stu_no):
        sql = "delete from school_student_info where stu_no = %s"
        with self.__conn.cursor() as cursor:
            row_count = cursor.execute(sql, (stu_no,))
            self.__conn.commit()
        return row_count
Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐