基于spark2.0文本分词+多分类模型

栏目: 服务器 · 发布时间: 5年前

内容简介:spark2.0开始引入dataframe作为RDD的上层封装,以屏蔽RDD层次的复杂操作,本文使用spark milib中ml机器学习库进行新闻文本多分类预测,包含数据预预处理,分词,标签和特征向量化转换、多分类模型训练(包含朴素贝叶斯、逻辑回归、决策树和随机森林),多分类模型预测和模型评估等完整的机器学习demo。本文分词方法选用HanLP分词工具(文档丰富、算法公开、代码开源,并且经测试分词效果比较好)。本文使用的数据为4类新闻,每条数据包含标签,标题,时间和新闻内容,以"\u00EF"符号作为分割符

文本分类

spark2.0开始引入dataframe作为RDD的上层封装,以屏蔽RDD层次的复杂操作,本文使用spark milib中ml机器学习库进行新闻文本多分类预测,包含数据预预处理,分词,标签和特征向量化转换、多分类模型训练(包含朴素贝叶斯、逻辑回归、决策树和随机森林),多分类模型预测和模型评估等完整的机器学习demo。本文分词方法选用HanLP分词工具(文档丰富、算法公开、代码开源,并且经测试分词效果比较好)。

1.数据预处理

1.1文本数据

本文使用的数据为4类新闻,每条数据包含标签,标题,时间和新闻内容,以"\u00EF"符号作为分割符,数据格式如下:

基于spark2.0文本分词+多分类模型

1.2预处理流程

文本清洗->标签索引化->内容文本分词->去除停用词->分词取前5000个词作为特征->特征向量化->保存预处理模型->调用预处理模型->输出预处理数据(indexedLabel,features)

1.3标签索引化

首先将文本读取成Dataframe格式,将标签列数据索引化,{文化,经济,军事和体育}向量化后为{0,1,2,3}

/**
    * 数据清洗 可根据具体数据结构和业务场景的不同进行重写. 注意: 输出必须要有标签字段"label"
    * @param filePath 数据路径
    * @param spark SparkSession
    * @return 清洗后的数据, 包含字段: "label", "title", "time", "content"
    */
  def clean(filePath: String, spark: SparkSession): DataFrame = {
    import spark.implicits._
    val textDF = spark.sparkContext.textFile(filePath).flatMap { line =>
      val fields = line.split("\u00EF")   //分隔符:ï,分成标签,标题,时间,内容
      //首页|文化新闻ï第十一届全国优秀舞蹈节目展演将在武汉举办ï2016-07-05 19:25:00ï新华社北京7月5日电(记者周玮)由文化部...
      //首页|财经中心|财经频道ï上半年浙江口岸原油进口量创同期历史新高ï2016-07-04 21:54:00ï杭州7月4日...
      if (fields.length > 3) {
        val categoryLine = fields(0)
        val categories = categoryLine.split("\\|")
        val category = categories.last
        //分成4个标签名和其他,最后去除标签为其他的数据
        var label = "其他"
        if (category.contains("文化")) label = "文化"
        else if (category.contains("财经")) label = "财经"
        else if (category.contains("军事")) label = "军事"
        else if (category.contains("体育")) label = "体育"
        else {}
        //输出标签,标题,时间,内容
        val title = fields(1)
        val time = fields(2)
        val content = fields(3)
        if (!label.equals("其他")) Some(label, title, time, content) else None
      } else None
    }.toDF("label", "title", "time", "content")
    //输出标签,标题,时间,内容DF
    textDF
  }
  /**
    * 处理label转换为索引形式
    * @param data 输入label字段的数据
    * @return 标签索引模型, 模型增加字段: "indexedLabel"
    */
  def indexrize(data: DataFrame): StringIndexerModel = {
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
      .fit(data)

    labelIndexer
  }
复制代码
predictDF.select("label","indexedLabel").show(10, truncate = false)
复制代码
基于spark2.0文本分词+多分类模型

1.4内容字段分词

处理内容字段,首先要进行分词,然后去除停用词以及转换为特征向量,方便分类模型进行训练和预测。本文模仿spark的ml包下的StopWordsRemover类创建了Segmenter类,用于对数据进行分词,其内部调用了HanLP分词工具。

由于spark自带的StopWordsRemover等使用的闭包仅限于ml包,自定义的类无法调用,故只是采用了与StopWordsRemover类似的使用形式,内部结构并不相同,并且由于以上原因,Segmenter类没有继承Transformer类,故无法进行pipeline管道操作,故在分类模型超参数调优过程中,没有加入分词模型的参数调优。

/**
    * 分词过程,包括"分词", "去除停用词"
    * @param data   输入需要分词的字段的数据"content"
    * @param params 分词参数
    * @return 分词处理后的DataFrame,增加字段: "tokens", "removed"
    */
  def segment(data: DataFrame, params: PreprocessParam): DataFrame = {
    val spark = data.sparkSession
    //设置分词模型
    val segmenter = new Segmenter()
      .setSegmentType(params.segmentType) //分词方式
      .isDelEn(params.delEn)              //是否去除英语单词
      .isDelNum(params.delNum)            //是否去除数字
      .addNature(params.addNature)        //是否添加词性
      .setMinTermLen(params.minTermLen)   //最小词长度
      .setMinTermNum(params.minTermNum)   //行最小词数
      .setInputCol("content")             //输入内容字段
      .setOutputCol("tokens")             //输出分词后的字段
    //进行分词
    val segDF = segmenter.transform(data)
复制代码

1.5去除停用词

分词之后,需要对一些常用的无意义词如:“的”、“我们”、“是”等(统称为“停用词”)进行去除。这些词没有多大的意义,但这些词不去掉会强烈的干扰我们对特征的抽取效果。(比如:在体育分类中,“的”出现500次,“足球”共出现300次,但显然足球更能表示体育分类,而“的”反而影响体育分类的结果。

去除停用词的操作我们直接调用ml包中的StopWordsRemover类:

//读取停用词数据
    val stopWordArray = spark.sparkContext.textFile(params.stopwordFilePath).collect()
    //设置停用词模型
    val remover = new StopWordsRemover()
      .setStopWords(stopWordArray)
      .setInputCol(segmenter.getOutputCol)   //读取"tokens"字段
      .setOutputCol("removed")               //输出删除停用词后的字段"removed"
    //删除停用词
    val removedDF = remover.transform(segDF)
    removedDF
  }
复制代码

1.6特征向量化

由于目前常用的分类、聚类等算法都是基于向量空间模型VSM(即将对象向量化为一个N维向量,映射成N维超空间中的一个点),VSM将数据转换为向量形式,便于对大规模数据进行矩阵操作等,也可以通过计算超空间中两个点之间的距离(一般是余弦距离)来计算两个向量之间的相似度。因此,我们需要将经过处理的语料转换为向量形式,这个过程叫做向量化。

这里我们也调用spark提供的向量化类CountVectorizer类进行向量化操作:

/**
   * 特征向量化处理,包括词汇表过滤
   * @param data   输入向量化的字段"removed"
   * @param params 配置参数
   * @return 向量模型
   */
 def vectorize(data: DataFrame, params: PreprocessParam): CountVectorizerModel = {
   //设置向量模型
   val vectorizer = new CountVectorizer()
     .setVocabSize(params.vocabSize)
     .setInputCol("removed")
     .setOutputCol("features")
   val parentVecModel = vectorizer.fit(data)
   //过滤停用词中没有的数字features
   val numPattern = "[0-9]+".r
   val vocabulary = parentVecModel.vocabulary.flatMap {
     term => if (term.length == 1 || term.matches(numPattern.regex)) None else Some(term)
   }
   val vecModel = new CountVectorizerModel(Identifiable.randomUID("cntVec"), vocabulary)
     .setInputCol("removed")
     .setOutputCol("features")
   vecModel
 }
复制代码

将字段"content"先进行分词和去除停用词得到"removed",再将所有词作为特征,进行特征向量化得到"features"字段:

基于spark2.0文本分词+多分类模型

在模型中可以设置出现次数最多的前5000个词作为分类用的特征,下图5000后有两个数组,第一个数值表示对应前5000个词的第几个词,第二组表示对应第一组出现的词在本条数据中的出现的次数,取出一条完整的数据看看:

基于spark2.0文本分词+多分类模型

1.7数据处理模型训练、保存和调用

为了方便每个模型单独训练和预测,将预处理也作为数据处理的模型进行训练,保存和调用,方法如下:

/**
    * 训练预处理模型
    * @param filePath 数据路径
    * @param spark SparkSession
    * @return (预处理后的数据,索引模型,向量模型)
    *          数据包括字段: "label", "indexedLabel", "title", "time", "content", "tokens", "removed", "features"
    */
  def train(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {

    val params = new PreprocessParam             //预处理参数
    val cleanDF = this.clean(filePath, spark)    //读取DF,清洗数据
    val indexModel = this.indexrize(cleanDF)     //调用索引模型
    val indexDF = indexModel.transform(cleanDF)  //标签索引化
    val segDF = this.segment(indexDF, params)    //将内容字段分词
    val vecModel = this.vectorize(segDF, params) //调用向量模型
    val trainDF = vecModel.transform(segDF)      //内容分词特征向量化
    this.saveModel(indexModel, vecModel, params) //保存模型

    (trainDF, indexModel, vecModel)
  }

  /**
    * 拟合预处理模型
    * @param filePath 数据路径
    * @param spark SparkSession
    * @return (预处理后的数据,索引模型,向量模型)
    */
  def predict(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {

    val params = new PreprocessParam                    //预处理参数
    val cleanDF = this.clean(filePath, spark)           //读取DF,清洗数据
    val (indexModel, vecModel) = this.loadModel(params) //加载索引和向量模型
    val indexDF = indexModel.transform(cleanDF)         //标签索引化
    val segDF = this.segment(indexDF, params)           //内容字段分词
    val predictDF = vecModel.transform(segDF)           //内容分词特征向量化

    (predictDF, indexModel, vecModel)
  }
复制代码

2.多分类模型训练和超参数调优

本文选用了常用的4中多分类模型对文本数据进行训练,利用了管道Pipeline + 网格搜索Gridsearch + 交叉验证CrossValidator 进行参数调优,直接将参数调优放在了训练模型里,将得到的最优模型保存。

2.1朴素贝叶斯

朴素贝叶斯算法原理

朴素贝叶斯算法是基于贝叶斯定理与特征条件独立假设的分类方法。

条件概率

P(A|B)表示事件B已经发生的前提下,事件A发生的概率,叫做事件B发生下事件A的条件概率。其基本求解公式为:

基于spark2.0文本分词+多分类模型

贝叶斯定理便是基于条件概率,通过P(A|B)来求P(B|A):

基于spark2.0文本分词+多分类模型
特征条件独立假设
基于spark2.0文本分词+多分类模型
基于spark2.0文本分词+多分类模型
基于spark2.0文本分词+多分类模型
朴素贝叶斯模型

常用的模型主要有3个,多项式、伯努利和高斯模型:

  • 当特征是离散的时候,使用多项式模型。
  • 伯努利模型也适用于离散特征的情况,所不同的是,伯努利模型中每个特征的取值只能是1和0,以文本分类为例,某个单词在文档中出现过,则其特征值为1,否则为0,而本文是把单词出现的次数作为特征,所以不适应于伯努利模型
  • 当特征是连续变量的时候,多项式模型及时加入平滑系数也很难描述分类特征,因此需要使用高斯模型

平滑系数

超参数平滑系数α,作用是防止后验概率为0,当α = 1时,称作Laplace平滑,当0 < α < 1时,称作Lidstone平滑,α = 0时不做平滑。本文主要对平滑系数进行调参。

/**
    * NB模型训练处理过程
    * @param data 训练数据集
    * @return nbBestModel
    */
  def train(data: DataFrame): NaiveBayesModel = {
    val params = new ClassParam
    //NB分类模型管道训练调参
    data.persist()
    data.show(5)
    //NB模型
    val nbModel = new NaiveBayes()
      .setModelType(params.nbModelType) //多项式模型或者伯努利模型
      .setSmoothing(params.smoothing)   //平滑系数
      .setLabelCol("indexedLabel")
      .setFeaturesCol("features")
    //建立管道,模型只有一个 stages = 0
    val pipeline = new Pipeline()
      .setStages(Array(nbModel))
    //建立网格搜索
    val paramGrid = new ParamGridBuilder()
      //.addGrid(nbModel.modelType, Array("multinomial", "bernoulli"))
      //伯努利模型需要特征为01的数据
      .addGrid(nbModel.smoothing, Array(0.01, 0.1, 0.2, 0.5))
      .build()
    //建立evaluator,必须要保证验证的标签列是向量化后的标签
    val evaluator = new BinaryClassificationEvaluator()
      .setLabelCol("indexedLabel")
    //建立一个交叉验证的评估器,设置评估器的参数
    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(2)
    //运行交叉验证评估器,得到最佳参数集的模型
    val cvModel = cv.fit(data)
    //获取最优逻辑回归模型
    val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
    val bestNBModel = bestModel.stages(0).asInstanceOf[NaiveBayesModel]
    println("类的数量(标签可以使用的值): " + bestNBModel.numClasses)
    println("模型所接受的特征的数量: " + bestNBModel.numFeatures)
    println("最优的modelType的值为: "+ bestNBModel.explainParam(bestNBModel.modelType))
    println("最优的smoothing的值为: "+ bestNBModel.explainParam(bestNBModel.smoothing))
    //更新最优朴素贝叶斯模型,并训练数据
    val nbBestModel = new NaiveBayes()
      .setModelType(bestNBModel.getModelType) //多项式模型或者伯努利模型
      .setSmoothing(bestNBModel.getSmoothing) //平滑系数
      .setLabelCol("indexedLabel")
      .setFeaturesCol("features")
      .fit(data)

    this.saveModel(nbBestModel, params)
    data.unpersist()
    nbBestModel
  }
复制代码

后续的三个算法原理网上都有很多,训练的代码也类似,本文只给出模型调参的部分代码。

2.2逻辑回归

//LR模型
    val lrModel = new LogisticRegression()
      .setMaxIter(bestLRModel.getMaxIter)    //模型最大迭代次数
      .setRegParam(bestLRModel.getRegParam)  //正则化参数
      .setElasticNetParam(params.elasticNetParam) //L1范式比例, L1/(L1 + L2)
      .setTol(params.converTol)          //模型收敛阈值
      .setLabelCol("indexedLabel")       //设置索引化标签字段
      .setFeaturesCol("features")        //设置向量化文本特征字段

    //建立网格搜索
    val paramGrid = new ParamGridBuilder()
      .addGrid(lrModel.maxIter, Array(5, 10))
      .addGrid(lrModel.regParam, Array(0.1, 0.2))
      .build()
复制代码

2.3决策树

//决策树模型
    val dtModel = new DecisionTreeClassifier()
      .setMinInfoGain(params.minInfoGain)  //最小信息增益阈值
      .setMaxDepth(params.maxDepth)        //决策树最大深度
      .setImpurity(params.impurity)        //节点不纯度和信息增益方法gini, entropy
      .setLabelCol("indexedLabel")         //设置索引化标签字段
      .setFeaturesCol("features")          //设置向量化文本特征字段
    //建立网格搜索
    val paramGrid = new ParamGridBuilder()
      .addGrid(dtModel.minInfoGain, Array(0.0, 0.1))
      .addGrid(dtModel.maxDepth, Array(10, 20))
      .addGrid(dtModel.impurity, Array("gini", "entropy"))
      .build()
复制代码

2.4随机森林

随机森林模型常常需要调试以提高算法效果的两个参数:numTrees,maxDepth

  • numTrees:增加决策树的个数会降低预测结果的方差,这样在测试时会有更高的accuracy。训练时间大致与numTrees呈线性增长关系
  • maxDepth:限定决策树的最大可能深度。最终的决策树的深度可能要比maxDepth小
  • minInfoGain:最小信息增益(设置阈值),但由于其它终止条件或者是被剪枝的缘故小于该值将不带继续分叉
  • maxBins:连续特征离散化时选用的最大分桶个数,并且决定每个节点如何分裂。(25,28,31)
  • impurity:计算信息增益的指标,熵和gini不纯度("entropy", "gini")
  • minInstancesPerNode:如果某个节点的样本数量小于该值,则该节点将不再被分叉。(设置阈值)
  • auto:在每个节点分裂时是否自动选择参与的特征个数
  • seed:随机数生成种子

实际上要想获得一个适当的阈值是相当困难的。高阈值可能导致过分简化的树,而低阈值可能简化不够。

预剪枝方法 minInfoGain、minInstancesPerNode 实际上是通过不断修改停止条件来得到合理的结果,这并不是一个好办法,事实上 我们常常甚至不知道要寻找什么样的结果。这样就需要对树进行后剪枝了(后剪枝不需要用户指定参数,是更为理想化的剪枝方法)

//随机森林模型(不加fit)
    val rfModel = new RandomForestClassifier()
      .setMaxDepth(params.maxDepth)          //决策树最大深度
      .setNumTrees(params.numTrees)          //设置决策树个数
      .setMinInfoGain(params.minInfoGain)  //最小信息增益阈值
      .setImpurity(params.impurity)        //信息增益的指标,选择熵或者gini不纯度
      //.setMaxBins(params.maxBins)          //最大分桶个数,用于连续特征离散化时决定每个节点如何分裂
      .setLabelCol("indexedLabel")           //设置索引化标签字段
      .setFeaturesCol("features")            //设置向量化文本特征字段
//建立网格搜索
    val paramGrid = new ParamGridBuilder()
      .addGrid(rfModel.maxDepth, Array(5, 10, 20))
      .addGrid(rfModel.numTrees, Array(5, 10, 20))
      .addGrid(rfModel.minInfoGain, Array(0.0, 0.1, 0.5))
      .build()
复制代码

3.多分类模型预测和模型评估

3.1模型评估类MulticlassClassificationEvaluator

机器学期一般都需要一个量化指标来衡量其效果:这个模型的准确率、召回率和F1值(这3个指标是评判模型预测能力常用的一组指标),spark提供了用于多分类模型评估的类MulticlassClassificationEvaluator,并将3个指标同时输出

object Evaluations extends Serializable {
  /**
    * 多分类结果评估
    * @param data 分类结果
    * @return (准确率, 召回率, F1)
    */
  def multiClassEvaluate(data: RDD[(Double, Double)]): (Double, Double, Double) = {
    val metrics = new MulticlassMetrics(data)
    val weightedPrecision = metrics.weightedPrecision
    val weightedRecall = metrics.weightedRecall
    val f1 = metrics.weightedFMeasure

    (weightedPrecision, weightedRecall, f1)
  }
}
复制代码

3.2四个多分类模型预测结果和模型评估

以逻辑回归为例,预测结果如下图,"probability"中4个值表示4个类别的预测概率:

基于spark2.0文本分词+多分类模型

4个分类模型的评估结果如下:

基于spark2.0文本分词+多分类模型

评估模型代码:

/**
  * Description: 多分类模型预测结果评估对比
  * Created by wy in 2019/4/16 10:07
  */
object MultiClassEvalution {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder
      .master("local")
      .appName("Multi_Class_Evaluation_Demo")
      .getOrCreate()

    val filePath = "data/dataTest/predict"

    //预处理(清洗、分词、向量化)
    val preprocessor = new Preprocessor
    val (predictDF, indexModel, _) = preprocessor.predict(filePath, spark)

    predictDF.select("content","removed", "features").show(1, truncate = false)
    //朴素贝叶斯模型预测
    val nbClassifier = new NBClassifier
    val nbPredictions = nbClassifier.predict(predictDF, indexModel)

    //逻辑回归模型预测
    val lrClassifier = new LRClassifier //import Classification.LogisticRegression.LRClassifier
    val lrPredictions = lrClassifier.predict(predictDF, indexModel)

    //决策树模型预测
    val dtClassifier = new DTClassifier
    val dtPredictions = dtClassifier.predict(predictDF, indexModel)

    //随机森林模型预测
    val rfClassifier = new RFClassifier
    val rfPredictions = rfClassifier.predict(predictDF, indexModel)

    //多个模型评估
    val predictions = Seq(nbPredictions, lrPredictions, dtPredictions, rfPredictions)
    val classNames = Seq("朴素贝叶斯模型", "逻辑回归模型", "决策树模型", "随机森林模型")

    for (i <- 0 to 3) {
      val prediction = predictions(i)
      val className = classNames(i)

      val resultRDD = prediction.select("prediction", "indexedLabel").rdd.map {
        case Row(prediction: Double, label: Double) => (prediction, label)
      }

      val (precision, recall, f1) = Evaluations.multiClassEvaluate(resultRDD)
      println(s"\n========= $className 评估结果 ==========")
      println(s"加权准确率:$precision")
      println(s"加权召回率:$recall")
      println(s"F1值:$f1")
    }
  }
}
复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Learning PHP, MySQL, and JavaScript

Learning PHP, MySQL, and JavaScript

Robin Nixon / O'Reilly Media / 2009-7-21 / USD 39.99

Learn how to create responsive, data-driven websites with PHP, MySQL, and JavaScript - whether or not you know how to program. This simple, streamlined guide explains how the powerful combination of P......一起来看看 《Learning PHP, MySQL, and JavaScript》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

html转js在线工具
html转js在线工具

html转js在线工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具