架构师

您现在的位置是:首页 > 程序人生 > 网络爬虫

网络爬虫

Java爬虫第14课:webmagic自定义Pipeline

架构师小跟班 2020-07-16 网络爬虫
使用和定制Pipeline在WebMagic中,Pileline是抽取结束后,进行处理的部分,它主要用于抽取结果的保存,也可以定制Pileline可以实现一些通用的功能。在这里我们会定制Pipeline实现数

使用和定制Pipeline

在WebMagic中,Pileline是抽取结束后,进行处理的部分,它主要用于抽取结果的保存,也可以定制Pileline可以实现一些通用的功能。在这里我们会定制Pipeline实现数据导入到数据库中

Pipeline输出 

Pipeline的接口定义如下:

public interface Pipeline {
    // ResultItems保存了抽取结果,它是一个Map结构,
    // 在page.putField(key,value)中保存的数据,
    //可以通过ResultItems.get(key)获取
    public void process(ResultItems resultItems, Task task);
}

可以看到,Pipeline其实就是将PageProcessor抽取的结果,继续进行了处理的,其实在Pipeline中完成的功能,你基本上也可以直接在PageProcessor实现,那么为什么会有Pipeline?有几个原因:

1)为了模块分离

“页面抽取”和“后处理、持久化”是爬虫的两个阶段,将其分离开来,一个是代码结构比较清晰,另一个是以后也可能将其处理过程分开,分开在独立的线程以至于不同的机器执行。

2)Pipeline的功能比较固定,更容易做成通用组件

每个页面的抽取方式千变万化,但是后续处理方式则比较固定,例如保存到文件、保存到数据库这种操作,这些对所有页面都是通用的。

在WebMagic框架中,一个Spider可以有多个Pipeline,使用Spider.addPipeline()即可增加一个Pipeline。这些Pipeline都会得到处理,例如可以使用:

spider.addPipeline(new ConsolePipeline()).addPipeline(new FilePipeline())

实现输出结果到控制台,并且保存到文件的目标。

已有的Pipeline

WebMagic中就已经提供了控制台输出、保存到文件、保存为JSON格式的文件几种通用的Pipeline。

类说明备注

ConsolePipeline

输出结果到控制台,抽取结果需要实现toString方法

FilePipeline

保存结果到文件,抽取结果需要实现toString方法

JsonFilePipeline

JSON格式保存结果到文件

ConsolePageModelPipeline

(注解模式)输出结果到控制台

FilePageModelPipeline

(注解模式)保存结果到文件

JsonFilePageModelPipeline

(注解模式)JSON格式保存结果到文件,想持久化的字段需要有getter方法

自定义Pipeline导入数据

自定义SpringDataPipeline

@Component
public class SpringDataPipeline implements Pipeline {
    @Autowired
    private JobInfoService jobInfoService;
    @Override
    public void process(ResultItems resultItems, Task task) {
        //获取需要保存到MySQL的数据
        JobInfo jobInfo = resultItems.get("jobInfo");
        //判断获取到的数据不为空
        if(jobInfo!=null) {
            //如果有值则进行保存
            this.jobInfoService.save(jobInfo);
        }
    }
}

在JobProcessor中修改process()启动的逻辑,添加代码

@Autowired
private SpringDataPipeline springDataPipeline;
public void process() {
    Spider.create(new JobProcessor())
            .addUrl(url)
            .addPipeline(this.springDataPipeline)
            .setScheduler(new QueueScheduler()
                    .setDuplicateRemover(new BloomFilterDuplicateRemover(10000000)))
            .thread(5)
            .run();
}


文章评论