侧边栏壁纸
  • 累计撰写 31 篇文章
  • 累计创建 6 个标签
  • 累计收到 2 条评论
标签搜索

目 录CONTENT

文章目录

爬取某视频网站所有视频并下载 -- 非正规网站 具体地址已做脱敏处理

lj2508_com
2024-04-11 / 0 评论 / 0 点赞 / 189 阅读 / 1,604 字

思路如下 注:最后仅爬取了部分视频,并未完整爬完

1. 获取所有的视频名和地址

翻页时发现,有如下请求。则可以通过修改后面的数字来获取请求
image
然后查看返回数据,从返回的数据中获取视频的名称和详情页地址
image-1712807032488
发现返回的是html,数据。然后编写正则表达式来获取数据。title和link地址
具体方法见parse_page_content
然后将数据保存到csv文件中,用来后续获取视频地址,
保存csv见save_to_csv
image-1712807161164
然后我们会获得一个csv文件,里面有两行数据,一行title和一行地址

2. 获取视频地址

通过上面的代码我们获取了详情页的地址,我们在循环访问地址来获取视频地址
我们先手动访问地址
image-1712807344814
发现视频是通过iframe嵌入到详情页里面的,而且iframe框的地址每次都会变化,于是通过get_video_url来获取iframe的地址
然后再次访问具体的视频播放地址。访问时通过postman发现会报错404 ,于是加入Referer参数再次访问,成功,应该是比较常用的Referer防盗链。后面的请求都加上Referer基本上都ok
然后通过播放,发现m3u8地址。查看了源代码,如下,获取m3u8地址
image-1712807598542
image-1712807558928
最后有了方法extract_info_from_url来获取最后的m3u8地址

3. 下载

上面我们获取了m3u8地址之后我们需要下载视频文件。具体方法见download_m3u8
共三步
1.下载m3u8文件,用来播放和做引导 可以在保存m3u8是将ts.key的地址修改为本地。
2.下载ts.key来解密
3.下载所有的ts视频文件。这个是具体的ts文件
下面给到m3u8文件示例
image-1712807750632
最后内容如下即可播放
image-1712807792129
image-1712807805669
image-1712807815148

4. 源代码

import requests
from bs4 import BeautifulSoup
import csv
import time
import re
import os


def get_page_content(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response.text
    else:
        return None

def parse_page_content(content):
    soup = BeautifulSoup(content, 'html.parser')
    articles = soup.find_all('article', class_='excerpt')
    data = []
    for article in articles:
        title = article.find('h2').text.strip()
        link = article.find('a', class_='thumbnail').get('href')
        data.append({'title': title, 'link': link})
    return data

def save_to_csv(data, filename):
    with open(filename, 'a', newline='', encoding='utf-8-sig') as csvfile:
        fieldnames = ['title', 'link']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        for item in data:
            writer.writerow({'title': item['title'], 'link': item['link']})

def main():
    base_url = 'https://xxxx.xxx/page/'
    page = 1
    seen_titles = set()  # 存储已经见过的标题
    filename = 'videos.csv'
    while True:
        url = base_url + str(page)
        content = get_page_content(url)
        if content:
            data = parse_page_content(content)
            if not data or any(item['title'] in seen_titles for item in data):
                print('No more data or found duplicate data. Exiting loop.')
                break
            seen_titles.update(item['title'] for item in data)
            save_to_csv(data, filename)
            print(f'Data from page {page} saved successfully.')
            page += 1
            time.sleep(10)  # 每次请求后等待 10 秒
        else:
            print(f'Failed to fetch page {page} content. Exiting loop.')
            break



def get_video_url(url):
    try:
        # 发送GET请求获取网页内容
        response = requests.get(url)
        # 使用BeautifulSoup解析HTML
        soup = BeautifulSoup(response.content, 'html.parser')
        # 找到所有<article>标签
        articles = soup.find_all('article', class_='article-content')
        for article in articles:
            # 找到所有<iframe>标签
            iframes = article.find_all('iframe')
            for iframe in iframes:
                # 获取iframe中的src属性值
                src = iframe.get('src')
                # 如果src属性值包含指定的关键字,则返回该地址
                if 'xxxx.xxxx/share/' in src:
                    return src
        # 如果没有找到符合条件的地址,则返回None
        return None
    except Exception as e:
        print("An error occurred:", e)
        return None


def extract_info_from_url(url):
    try:
        # 发送GET请求获取页面内容
        response = requests.get(url)
        source_code = response.text
        #print(source_code)
        # 调试输出源代码
        antiurl_match = re.search(r'var\s+antiurl\s*=\s*"(.*?)";', source_code)
        token_match = re.search( r'var\s+token\s*=\s*"([^"]+)"', source_code)
        m3u8_match = re.search(r'var\s+m3u8\s*=\s*\'([^"]+\.m3u8)\'', source_code)
        if antiurl_match and token_match and m3u8_match:
            antiurl = "https://xxx.xxx.xxx" #antiurl_match.group(1)
            token = token_match.group(1)
            m3u8 = m3u8_match.group(1)
            # 拼接最终的播放地址
            final_url = f"{antiurl}{m3u8}?token={token}"
            return final_url
        else:
            print("无法找到匹配的信息。")
            return None
    except Exception as e:
        print("An error occurred:", e)
        return None

def download_m3u8(url, referer, filename):
    headers = {
        'Referer': referer,
        'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
        'Accept':'*/*',
        'Accept-Encoding':'gzip, deflate, br, zstd',
        'Connection':'keep-alive'
    }
    response = requests.get(url, headers=headers)
    m3u8_content = response.text

    # 解析m3u8文件,提取ts文件链接
    ts_urls = [line.strip() for line in m3u8_content.split('\n') if line.endswith('.ts')]

    # 创建保存ts文件的目录
    if not os.path.exists(filename):
        os.makedirs(filename)
    m3u8Name = os.path.join(filename, "index.m3u8")
    with open(m3u8Name, 'wb') as f:
        f.write(response.content)
    match = re.search(r'(https?://\S+?)/index\.m3u8', url)
    extracted_url = match.group(1)

    key_url = extracted_url + "/ts.key"
    key_response = requests.get(key_url, headers=headers)
    keyName = os.path.join(filename, "ts.key")
    with open(keyName, 'wb') as f:
        f.write(key_response.content)
    # 下载ts文件
    for i, ts_url in enumerate(ts_urls):
        ts_url= extracted_url+"/"+ts_url
        ts_filename = f'index{i}.ts'
        ts_filepath = os.path.join(filename, ts_filename)
        with open(ts_filepath, 'wb') as f:
            print(f'Downloading {ts_url}...')
            response = requests.get(ts_url, headers=headers)
            f.write(response.content)

    print('Download completed.')


def get_video(url,filename):
    video_url = get_video_url(url)
    if video_url:
        print("视频地址:", video_url)
        final_url = extract_info_from_url(video_url)
        print("最终视频视频地址:", final_url)
        download_m3u8(final_url,video_url,"mp4/"+filename.replace('/', '_'))
    else:
        print("未找到视频地址。")


def download_csv():
    filename = 'videos.csv'
    with open(filename, 'r', newline='', encoding='utf-8-sig') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            title = row['title']
            link = row['link']
            print(f'Processing video: {title}')
            # 在这里调用你需要的函数,比如下载视频或其他操作
            # 下面是一个示例,你需要根据实际情况进行调整
            get_video(link, title)
            time.sleep(2)  # 可以添加适当的延迟,避免过于频繁的操作

if __name__ == '__main__':
    download_csv()

0

评论区