我正在使用Scrapy(Python库)来抓取网站并定期生成json输出文件。为了提高效率,我想在每个蜘蛛完成后将这些json文件批量upserts到Mongodb中。
我相信我可以这样做:
mongoimport -c <collection> -d <db> --mode merge --file test.json
但是,我想知道一旦蜘蛛完成后触发此导入的最佳方法是什么?如何?
我希望我可以使用这里描述的close_spider方法:https://doc.scrapy.org/en/latest/topics/item-pipeline.html#writing-your-own-item-pipeline
然而,在玩完它后,我发现只创建了json文件,并且在此方法内部时不会写入。
如果有一些方法让我在某个目录中监听一个新文件,然后执行上面的import语句,那就太好了。
也许,这一切都可以在bash脚本中完成
您可以使用项目管道将项目直接写入Mongo。看看这个例子,来自Scrapy's documentation:
Write items to MongoDB
在这个例子中,我们将使用pymongo将项目写入MongoDB。 MongoDB地址和数据库名称在Scrapy设置中指定; MongoDB集合以item类命名。
这个例子的要点是展示如何使用from_crawler()方法以及如何正确地清理资源:
import pymongo class MongoPipeline(object): collection_name = 'scrapy_items' def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db @classmethod def from_crawler(cls, crawler): return cls( mongo_uri=crawler.settings.get('MONGO_URI'), mongo_db=crawler.settings.get('MONGO_DATABASE', 'items') ) def open_spider(self, spider): self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] def close_spider(self, spider): self.client.close() def process_item(self, item, spider): self.db[self.collection_name].insert_one(dict(item)) return item
这个方法适用于我(在你的蜘蛛文件中):
import os
from scrapy import signals
from scrapy.xlib.pydispatch import dispatcher
class MySpider(scrapy.Spider):
def __init__(self):
dispatcher.connect(self.spider_closed, signals.spider_closed)
def spider_closed(self, spider):
os.system("your_command")
一种解决方案是使用pyinotify来查看所选目录中的文件。我从here得到了这个想法,并将其改编为执行mongo import语句。
class MyEventHandler(pyinotify.ProcessEvent):
def process_IN_ACCESS(self, event):
print("ACCESS event:", event.pathname)
def process_IN_ATTRIB(self, event):
print("ATTRIB event:", event.pathname)
def process_IN_CLOSE_NOWRITE(self, event):
print("CLOSE_NOWRITE event:", event.pathname)
def process_IN_CLOSE_WRITE(self, event):
print("CLOSE_WRITE event:", event.pathname)
result = os.system('mongoimport -c kray4 -d kray4 --mode merge --file /home/kevin/PycharmProjects/spider/krawler/output/test.json')
print("Result: " + str(result))
def process_IN_CREATE(self, event):
print("CREATE event:", event.pathname)
def process_IN_DELETE(self, event):
print("DELETE event:", event.pathname)
def process_IN_MODIFY(self, event):
print("MODIFY event:", event.pathname)
def process_IN_OPEN(self, event):
print("OPEN event:", event.pathname)
def main():
# watch manager
wm = pyinotify.WatchManager()
wm.add_watch('/home/kevin/PycharmProjects/spider/krawler/output/test.json', pyinotify.ALL_EVENTS, rec=True)
# event handler
eh = MyEventHandler()
# notifier
notifier = pyinotify.Notifier(wm, eh)
#command = 'echo 1 > /proc/sys/net/ipv4/ip_forward'
notifier.loop()
if __name__ == '__main__':
main()