博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python语言磁力搜索引擎源码公开,基于DHT协议
阅读量:5884 次
发布时间:2019-06-19

本文共 20660 字,大约阅读时间需要 68 分钟。

hot3.png

之前我在写,的时候答应网友说,抽时间要把的的源码公开,如今是时候兑现诺言了,下面就是爬虫的所有代码,完全,彻底的公开,你会不会写程序都可以使用,不过请先装个linux系统,具备公网条件,然后运行: 

python startCrawler.py

有必要提醒你,数据库字段代码中都有,请你自己建张表格,这个太简单了,就不多说了。同时我也提供一下下载地址,源码都在: 

#!/usr/bin/env python# encoding: utf-8"""author:haoningcreate time:2015.8.1"""import hashlibimport osimport timeimport datetimeimport tracebackimport sysimport randomimport jsonimport socketimport threadingfrom hashlib import sha1 #进行hash加密from random import randintfrom struct import unpackfrom socket import inet_ntoafrom threading import Timer, Threadfrom time import sleepfrom collections import dequefrom Queue import Queue import MySQLdb as mdb  #数据库连接器 import metautilsimport downloadTorrentfrom bencode import bencode, bdecodeimport pygeoip DB_HOST = '127.0.0.1'DB_USER = 'root'DB_PASS = 'root' BOOTSTRAP_NODES = (    ("67.215.246.10", 6881),    ("82.221.103.244", 6881),    ("23.21.224.150", 6881))RATE = 1 #调控速率TID_LENGTH = 2RE_JOIN_DHT_INTERVAL = 3TOKEN_LENGTH = 2INFO_HASH_LEN = 500000 #50w数据很小,限制内存不至于消耗太大CACHE_LEN = 100 #更新数据库缓存WAIT_DOWNLOAD = 80  geoip = pygeoip.GeoIP('GeoIP.dat') def is_ip_allowed(ip):    country = geoip.country_code_by_addr(ip)    if country in ('CN','TW','JP','HK', 'KR'):        return True    return False def entropy(length):    return "".join(chr(randint(0, 255)) for _ in xrange(length)) def random_id():    h = sha1()    h.update(entropy(20))    return h.digest()  def decode_nodes(nodes):    n = []    length = len(nodes)    if (length % 26) != 0:        return n     for i in range(0, length, 26):        nid = nodes[i:i+20]        ip = inet_ntoa(nodes[i+20:i+24])        port = unpack("!H", nodes[i+24:i+26])[0]        n.append((nid, ip, port))     return n  def timer(t, f):    Timer(t, f).start()  def get_neighbor(target, nid, end=10):    return target[:end]+nid[end:]  class KNode(object):     def __init__(self, nid, ip, port):        self.nid = nid        self.ip = ip        self.port = port  class DHTClient(Thread):     def __init__(self, max_node_qsize):        Thread.__init__(self)        self.setDaemon(True)        self.max_node_qsize = max_node_qsize        self.nid = random_id()        self.nodes = deque(maxlen=max_node_qsize)     def send_krpc(self, msg, address):        try:            self.ufd.sendto(bencode(msg), address)        except Exception:            pass     def send_find_node(self, address, nid=None):        nid = get_neighbor(nid, self.nid) if nid else self.nid        tid = entropy(TID_LENGTH)        msg = {            "t": tid,            "y": "q",            "q": "find_node",            "a": {                "id": nid,                "target": random_id()            }        }        self.send_krpc(msg, address)     def join_DHT(self):        for address in BOOTSTRAP_NODES:            self.send_find_node(address)     def re_join_DHT(self):        if len(self.nodes) == 0:            self.join_DHT()        timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)     def auto_send_find_node(self):        wait = 1.0 / self.max_node_qsize        while True:            try:                node = self.nodes.popleft()                self.send_find_node((node.ip, node.port), node.nid)            except IndexError:                pass            try:                sleep(wait)            except KeyboardInterrupt:                os._exit(0)     def process_find_node_response(self, msg, address):        nodes = decode_nodes(msg["r"]["nodes"])        for node in nodes:            (nid, ip, port) = node            if len(nid) != 20: continue            if ip == self.bind_ip: continue            n = KNode(nid, ip, port)            self.nodes.append(n)  class DHTServer(DHTClient): #获得info_hash     def __init__(self, master, bind_ip, bind_port, max_node_qsize):        DHTClient.__init__(self, max_node_qsize)         self.master = master        self.bind_ip = bind_ip        self.bind_port = bind_port        self.speed=0         self.process_request_actions = {            "get_peers": self.on_get_peers_request,            "announce_peer": self.on_announce_peer_request,        }         self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)        self.ufd.bind((self.bind_ip, self.bind_port))         timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)      def run(self):        self.re_join_DHT()        while True:            try:                (data, address) = self.ufd.recvfrom(65536)                msg = bdecode(data)                self.on_message(msg, address)            except Exception:                pass     def on_message(self, msg, address):        global RATE #设为全局量        try:            if msg["y"] == "r":                if msg["r"].has_key("nodes"):                    self.process_find_node_response(msg, address) #发现节点            elif msg["y"] == "q":                try:                    self.speed+=1                    if self.speed % 10000 ==0:                        RATE=random.randint(1,3)                        if RATE==2:                            RATE=1                        if RATE==3:                            RATE=10                        if self.speed>100000:                            self.speed=0                    if self.speed % RATE==0: #数据过多,占用cpu太多,划分限速,1,1,10                        self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash                    #self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash                except KeyError:                    self.play_dead(msg, address)        except KeyError:            pass     def on_get_peers_request(self, msg, address):        try:            infohash = msg["a"]["info_hash"]            tid = msg["t"]            nid = msg["a"]["id"]            token = infohash[:TOKEN_LENGTH]            msg = {                "t": tid,                "y": "r",                "r": {                    "id": get_neighbor(infohash, self.nid),                    "nodes": "",                    "token": token                }            }            self.master.log(infohash, address)            self.send_krpc(msg, address)        except KeyError:            pass     def on_announce_peer_request(self, msg, address):        try:            infohash = msg["a"]["info_hash"]            token = msg["a"]["token"]            nid = msg["a"]["id"]            tid = msg["t"]             if infohash[:TOKEN_LENGTH] == token:                if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:                    port = address[1]                else:                    port = msg["a"]["port"]                self.master.log_announce(infohash, (address[0], port))        except Exception:            print 'error'            pass        finally:            self.ok(msg, address)     def play_dead(self, msg, address):        try:            tid = msg["t"]            msg = {                "t": tid,                "y": "e",                "e": [202, "Server Error"]            }            self.send_krpc(msg, address)        except KeyError:            pass     def ok(self, msg, address):        try:            tid = msg["t"]            nid = msg["a"]["id"]            msg = {                "t": tid,                "y": "r",                "r": {                    "id": get_neighbor(nid, self.nid)                }            }            self.send_krpc(msg, address)        except KeyError:            pass  class Master(Thread): #解析info_hash     def __init__(self):        Thread.__init__(self)        self.setDaemon(True)        self.queue = Queue()        self.cache = Queue()        self.count=0        self.mutex = threading.RLock() #可重入锁,使单线程可以再次获得已经获得的?        self.waitDownload = Queue()        self.metadata_queue = Queue()        self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'oksousou', charset='utf8')        self.dbconn.autocommit(False)        self.dbcurr = self.dbconn.cursor()        self.dbcurr.execute('SET NAMES utf8')        self.visited = set()                     def lock(self): #加锁        self.mutex.acquire()     def unlock(self): #解锁        self.mutex.release()             def work(self,item):         print "start thread",item        while True:            self.prepare_download_metadata()            self.lock()            self.download_metadata()            self.unlock()             self.lock()            self.got_torrent()            self.unlock()                         def start_work(self,max):             for item in xrange(max):            t = threading.Thread(target=self.work, args=(item,))            t.setDaemon(True)            t.start()             #入队的种子效率更高    def log_announce(self, binhash, address=None):        if self.queue.qsize() < INFO_HASH_LEN : #大于INFO_HASH_LEN就不要入队,否则后面来不及处理            if is_ip_allowed(address[0]):                self.queue.put([address, binhash]) #获得info_hash             def log(self, infohash, address=None):        if self.queue.qsize() < INFO_HASH_LEN: #大于INFO_HASH_LEN/2就不要入队,否则后面来不及处理            if is_ip_allowed(address[0]):                self.queue.put([address, infohash])         def prepare_download_metadata(self):                 if self.queue.qsize() == 0:            sleep(2)        #从queue中获得info_hash用来下载        address, binhash= self.queue.get()         if binhash in self.visited:            return        if len(self.visited) > 100000: #大于100000重置队列,认为已经访问过了            self.visited = set()        self.visited.add(binhash)        #跟新已经访问过的info_hash        info_hash = binhash.encode('hex')        utcnow = datetime.datetime.utcnow()                 self.cache.put((address,binhash,utcnow)) #装入缓存队列         def download_metadata(self):             if self.cache.qsize() > CACHE_LEN/2: #出队更新下载            while self.cache.qsize() > 0: #排空队列                address,binhash,utcnow = self.cache.get()                info_hash = binhash.encode('hex')                self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))                y = self.dbcurr.fetchone()                if y:                # 更新最近发现时间,请求数                    self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))                else:                     self.waitDownload.put((address, binhash))            self.dbconn.commit()            if self.waitDownload.qsize() > WAIT_DOWNLOAD:                while self.waitDownload.qsize() > 0:                    address,binhash = self.waitDownload.get()                    t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue))                    t.setDaemon(True)                    t.start()     def decode(self, s):        if type(s) is list:            s = ';'.join(s)        u = s        for x in (self.encoding, 'utf8', 'gbk', 'big5'):            try:                u = s.decode(x)                return u            except:                pass        return s.decode(self.encoding, 'ignore')     def decode_utf8(self, d, i):        if i+'.utf-8' in d:            return d[i+'.utf-8'].decode('utf8')        return self.decode(d[i])         def parse_metadata(self, data): #解析种子        info = {}        self.encoding = 'utf8'        try:            torrent = bdecode(data) #编码后解析            if not torrent.get('name'):                return None        except:            return None        detail = torrent        info['name'] = self.decode_utf8(detail, 'name')        if 'files' in detail:            info['files'] = []            for x in detail['files']:                if 'path.utf-8' in x:                    v = {'path': self.decode('/'.join(x['path.utf-8'])), 'length': x['length']}                else:                    v = {'path': self.decode('/'.join(x['path'])), 'length': x['length']}                if 'filehash' in x:                    v['filehash'] = x['filehash'].encode('hex')                info['files'].append(v)            info['length'] = sum([x['length'] for x in info['files']])        else:            info['length'] = detail['length']        info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest()        return info     def got_torrent(self):        if self.metadata_queue.qsize() == 0:            return        binhash, address, data,start_time = self.metadata_queue.get()        if not data:            return        try:            info = self.parse_metadata(data)            if not info:                return        except:            traceback.print_exc()            return         temp = time.time()        x = time.localtime(float(temp))        utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now                 info_hash = binhash.encode('hex') #磁力        info['info_hash'] = info_hash        # need to build tags        info['tagged'] = False        info['classified'] = False        info['requests'] = 1        info['last_seen'] = utcnow        info['create_time'] = utcnow        info['source_ip'] = address[0]                 if info.get('files'):            files = [z for z in info['files'] if not z['path'].startswith('_')]            if not files:                files = info['files']        else:            files = [{'path': info['name'], 'length': info['length']}]        files.sort(key=lambda z:z['length'], reverse=True)        bigfname = files[0]['path']        info['extension'] = metautils.get_extension(bigfname).lower()        info['category'] = metautils.get_category(info['extension'])         try:            try:                print '\n', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0]            except:                print '\n', 'Saved', info['info_hash']            ret = self.dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' +                 'length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',                (info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'],                info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests']))            if self.count %50 ==0:                self.dbconn.commit()                if self.count>100000:                    self.count=0        except:            print self.name, 'save error', self.name, info            traceback.print_exc()            return if __name__ == "__main__":         #启动客户端    master = Master()    master.start_work(150)         #启动服务器    dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)    dht.start()    dht.auto_send_find_node()

注意,上面的代码有一段代码需要下载种子,所以下面的这段代码十分重要:

#!/usr/bin/env python# encoding: utf-8"""author:haoningcreate time:2015.8.1"""from hashlib import sha1import mathfrom socket import inet_ntoaimport socketfrom struct import pack, unpackfrom threading import Timer, Threadfrom time import sleep, time from bencode import bencode, bdecode from startCrawler import entropy  BT_PROTOCOL = "BitTorrent protocol"BT_MSG_ID = 20EXT_HANDSHAKE_ID = 0 def random_id():    hash = sha1()    hash.update(entropy(20))    return hash.digest() def send_packet(the_socket, msg):    the_socket.send(msg) def send_message(the_socket, msg):    msg_len = pack(">I", len(msg))    send_packet(the_socket, msg_len + msg) def send_handshake(the_socket, infohash):    bt_header = chr(len(BT_PROTOCOL)) + BT_PROTOCOL    ext_bytes = "\x00\x00\x00\x00\x00\x10\x00\x00"    peer_id = random_id()    packet = bt_header + ext_bytes + infohash + peer_id     send_packet(the_socket, packet) def check_handshake(packet, self_infohash):    try:        bt_header_len, packet = ord(packet[:1]), packet[1:]        if bt_header_len != len(BT_PROTOCOL):            return False    except TypeError:        return False     bt_header, packet = packet[:bt_header_len], packet[bt_header_len:]    if bt_header != BT_PROTOCOL:        return False     packet = packet[8:]    infohash = packet[:20]    if infohash != self_infohash:        return False     return True def send_ext_handshake(the_socket):    msg = chr(BT_MSG_ID) + chr(EXT_HANDSHAKE_ID) + bencode({"m":{"ut_metadata": 1}})    send_message(the_socket, msg) def request_metadata(the_socket, ut_metadata, piece):    """bep_0009"""    msg = chr(BT_MSG_ID) + chr(ut_metadata) + bencode({"msg_type": 0, "piece": piece})    send_message(the_socket, msg) def get_ut_metadata(data):    ut_metadata = "_metadata"    index = data.index(ut_metadata)+len(ut_metadata) + 1    return int(data[index]) def get_metadata_size(data):    metadata_size = "metadata_size"    start = data.index(metadata_size) + len(metadata_size) + 1    data = data[start:]    return int(data[:data.index("e")]) def recvall(the_socket, timeout=5):    the_socket.setblocking(0)    total_data = []    data = ""    begin = time()     while True:        sleep(0.05)        if total_data and time()-begin > timeout:            break        elif time()-begin > timeout*2:            break        try:            data = the_socket.recv(1024)            if data:                total_data.append(data)                begin = time()        except Exception:            pass    return "".join(total_data) def download_metadata(address, infohash, metadata_queue, timeout=5):    metadata = None    start_time = time()    the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    try:         the_socket.settimeout(timeout)        the_socket.connect(address)         # handshake        send_handshake(the_socket, infohash)        packet = the_socket.recv(4096)         # handshake error        if not check_handshake(packet, infohash):            return         # ext handshake        send_ext_handshake(the_socket)        packet = the_socket.recv(4096)         # get ut_metadata and metadata_size        ut_metadata, metadata_size = get_ut_metadata(packet), get_metadata_size(packet)         # request each piece of metadata        metadata = []        for piece in range(int(math.ceil(metadata_size/(16.0*1024)))): #piece是个控制块,根据控制块下载数据            request_metadata(the_socket, ut_metadata, piece)            packet = recvall(the_socket, timeout) #the_socket.recv(1024*17)            metadata.append(packet[packet.index("ee")+2:])                metadata = "".join(metadata)     except socket.timeout:        pass    except Exception, e:        pass    finally:        #print "metadata= %s" %(metadata)        the_socket.close() #确保没回都关闭socket        if metadata != None: #只让不空的种子入?                        metadata_queue.put((infohash, address, metadata,start_time))

其实下载种子还有一种方式就是借助libtorrent,但这个太耗费cpu了,所以我一般不用他,如下:

#coding: utf8import threadingimport tracebackimport randomimport timeimport osimport socket import libtorrent as lt threading.stack_size(200*1024)socket.setdefaulttimeout(30) def fetch_torrent(session, ih, timeout):    name = ih.upper()    url = 'magnet:?xt=urn:btih:%s' % (name,)    data = ''    params = {        'save_path': '/tmp/downloads/',        'storage_mode': lt.storage_mode_t(2),        'paused': False,        'auto_managed': False,        'duplicate_is_error': True}    try:        handle = lt.add_magnet_uri(session, url, params)    except:        return None    status = session.status()    handle.set_sequential_download(1)    meta = None    down_time = time.time()    down_path = None    for i in xrange(0, timeout):        if handle.has_metadata():            info = handle.get_torrent_info()            down_path = '/tmp/downloads/%s' % info.name()            #print 'status', 'p', status.num_peers, 'g', status.dht_global_nodes, 'ts', status.dht_torrents, 'u', status.total_upload, 'd', status.total_download            meta = info.metadata()            break        time.sleep(1)    if down_path and os.path.exists(down_path):        os.system('rm -rf "%s"' % down_path)    session.remove_torrent(handle)    return meta  def download_metadata(address, binhash, metadata_queue, timeout=20):    metadata = None    start_time = time.time()    try:        session = lt.session()        r = random.randrange(10000, 50000)        session.listen_on(r, r+10)        session.add_dht_router('router.bittorrent.com',6881)        session.add_dht_router('router.utorrent.com',6881)        session.add_dht_router('dht.transmission.com',6881)        session.add_dht_router('127.0.0.1',6881)        session.start_dht()        metadata = fetch_torrent(session, binhash.encode('hex'), timeout)        session = None    except:        traceback.print_exc()    finally:        metadata_queue.put((binhash, address, metadata,start_time))

这个爬虫还是耗费了本人和其他网上高手的很多时间的,请看到这篇博客的朋友保持钻研精神,开源精神,多多交流,秉承分享。本人建立个qq群作为的官方群,人数现在也不多,如果有兴趣的话来逛逛吧,多个粉丝多一份热闹,qq群号:512245829

转载于:https://my.oschina.net/u/2303983/blog/685638

你可能感兴趣的文章
css绘制几何图形
查看>>
HTML标签
查看>>
理解JS中的Event Loop机制
查看>>
转载:字符编码笔记:ASCII,Unicode和UTF 8
查看>>
修复看不懂的 Console Log
查看>>
Android跨进程通信 AIDL使用
查看>>
ajax常见面试题
查看>>
结合kmp算法的匹配动画浅析其基本思想
查看>>
vue进行wepack打包执行npm run build出现错误
查看>>
【d3.js v4基础】过渡transition
查看>>
VUEJS开发规范
查看>>
Android系统的创世之初以及Activity的生命周期
查看>>
人人都会数据采集- Scrapy 爬虫框架入门
查看>>
Android网络编程11之源码解析Retrofit
查看>>
韩国SK电讯宣布成功研发量子中继器
查看>>
TCP - WAIT状态及其对繁忙的服务器的影响
查看>>
安全预警:全球13.5亿的ARRIS有线调制解调器可被远程攻击
查看>>
麦子学院与阿里云战略合作 在线教育领军者技术实力被认可
查看>>
正确看待大数据
查看>>
Facebook通过10亿单词构建有效的神经网络语言模型
查看>>