卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章11201本站已运行3223

[Python 原创] 抖音爬虫(主页、喜欢列表全部下载)


说明
以下代码仅供交流,主要爬取抖音单个用户的主页或喜欢中涉及的所有视频;
文件夹内共以下内容:
  1. 爬虫:douyin_spider.py
  2. 多线程下载器:douyin_download_N_thread.py
  3. data.json:记录爬取到的信息,供下载器使用
  4. 文件夹download_files:储存下载的视频文件
爬虫代码:(data里面还有很多信息,自己可以打印出来看看)

import requests
import json
import time
import os

os.chdir(os.path.dirname(os.path.realpath(__file__)))
def get_data(sec_uid,max_cursor,mode):
    headers = {
        'Connection': 'keep-alive',
        'Accept': 'application/json, text/plain, */*',
        'Agw-Js-Conv': 'str',
        'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Mobile Safari/537.36',
        'Sec-Fetch-Site': 'same-origin',
        'Sec-Fetch-Mode': 'cors',
        'Sec-Fetch-Dest': 'empty',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    }
    params = {
        'reflow_source': 'reflow_page',
        'sec_uid': sec_uid,
        'count': '100',
        'max_cursor': max_cursor,
    }
    response = requests.get(f'https://m.douyin.com/web/api/v2/aweme/{mode}/', params=params, headers=headers)
    data = response.json()
    for d in data['aweme_list']:
        output = {}
        output['title'] = d['desc']
        output['VideoUrl'] = d['video']['play_addr']['url_list'][-1]
        output['img'] = d['video']['dynamic_cover']['url_list'][-1]
        output['id'] = d['aweme_id']
        result.append(output)
        print(output['title'])
    return data['max_cursor'],data['has_more']

def run(sec_uid,mode='post'):
    max_cursor = '0'
    for i in range(100):
        x = get_data(sec_uid,max_cursor,mode)
        if x[1]:
            max_cursor = x[0]
            get_data(sec_uid,max_cursor,mode)
        else:
            break

if __name__ == '__main__':
    result = []
    choice = input('''
*******************************************************
    请选择需要下载的类别序号(1/2):
    1、该账号主页的所有视频;
    2、改账号喜欢列表所有视频。
*******************************************************\n
    ''')
    sec_uid = input('''
*******************************************************
    请选择需账号的sec_uid码:
    例如:主页链接(PC网页打开)
    链接:https://www.douyin.com/user/MS4wLjABAAAAfeHUJALUV_hro9kN7QT5I9pe9DNVDSkiCTiqfK0ziZo?vid=7151405922777107753
    sec_uid码:MS4wLjABAAAAfeHUJALUV_hro9kN7QT5I9pe9DNVDSkiCTiqfK0ziZo
*******************************************************\n''')
    sel = {'1':'post','2':'like'}
    mode = sel[choice]
    path = f'data.json'
    try:        
        run(sec_uid,mode)
        print('完成搜索数量:',len(result))
        with open(path,'w',encoding='utf-8') as f:
            json.dump(result,f,indent=4, ensure_ascii=False)
    except:
        print('完成搜索数量(lost_part):',len(result))
        with open(path,'w',encoding='utf-8') as f:
            json.dump(result,f,indent=4, ensure_ascii=False)
    time.sleep(3)
 


多线程下载器代码

import queue
import threading
import time
import requests
import json
import re
import os

os.chdir(os.path.dirname(os.path.realpath(__file__)))

class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        # print ("开启线程:" + self.name)
        process_data(self.name, self.q)
        # print ("退出线程:" + self.name)

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            task_arg = q.get()
            queueLock.release()
            main(task_arg)

            print ("%s processing %s" % (threadName, '*'*20))
        else:
            queueLock.release()
        time.sleep(1)

def thread_task(threadList,task_args,n):
    global workQueue,queueLock,exitFlag
    queueLock = threading.Lock()
    workQueue = queue.Queue(n)
    exitFlag = 0
    threads = []
    threadID = 1
    # 创建新线程
    for tName in threadList:
        thread = myThread(threadID, tName, workQueue)
        thread.start()
        threads.append(thread)
        threadID += 1
    # 填充队列
    queueLock.acquire()
    for task_arg in task_args:
        workQueue.put(task_arg)
    queueLock.release()
    # 等待队列清空
    while not workQueue.empty():
        pass
    # 通知线程是时候退出
    exitFlag = 1
    # 等待所有线程完成
    for t in threads:
        t.join()
    print("退出主线程")

def main(data):  
        # print(data)      
        url = data['VideoUrl']
        id = data['id']
        title = data['title']
        if title == "":
            name = id
        else:
            name = title
        intab = r'[?*/\|.:><]'
        name = re.sub(intab, "", name).replace(" ","")
        try:
            response = requests.get(url, headers=headers)
        except:
            print('网页请求错误:\n','*'*100+'\n',name+'\n','*'*100)
        try:
            with open (f'{outout_dir}//{name}.mp4','wb') as b:
                b.write(response.content)
            print('已下载:',name)
        except:
            print('下载错误:\n','*'*100+'\n',name+'\n','*'*100)

if __name__ == '__main__':
    threadList = []
    for i in range(50):
        threadList.append(f'Thread-{i+1}')
    outout_dir = r'./download_files'
    from faker import Factory
    fake = Factory().create('zh_CN')
    headers = {'User-Agent': fake.user_agent()}
    with open('data.json','r',encoding='utf-8') as f:
        data_json = json.load(f)
    task_args = data_json
    thread_task(threadList, task_args,len(task_args))

 

 
douyin.zip
440651215d214b1a3572c9254b04ff22.zip (58.55 KB)
卓越飞翔博客
上一篇: [python]一键断网联网,含bat代码
下一篇: 【python】写真网站小姐姐图片集合打包下载
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏