Spark网站日志过滤分析实例讲解
作者:CarveStone 发布时间:2021-06-08 12:59:24
日志过滤
对于一个网站日志,首先要对它进行过滤,删除一些不必要的信息,我们通过scala语言来实现,清洗代码如下,代码要通过别的软件打包为jar包,此次实验所用需要用到的代码都被打好jar包,放到了/root/jar-files文件夹下:
package com.imooc.log
import com.imooc.log.SparkStatFormatJob.SetLogger
import com.imooc.log.util.AccessConvertUtil
import org.apache.spark.sql.{SaveMode, SparkSession}
/*
数据清洗部分
*/
object SparkStatCleanJob {
def main(args: Array[String]): Unit = {
SetLogger
val spark = SparkSession.builder()
.master("local[2]")
.appName("SparkStatCleanJob").getOrCreate()
val accessRDD = spark.sparkContext.textFile("/root/resources/access.log")
accessRDD.take(4).foreach(println)
val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),AccessConvertUtil.struct)
accessDF.printSchema()
//-----------------数据清洗存储到目标地址------------------------
// coalesce(1)输出指定分区数的小文件
accessDF.coalesce(1).write.format("parquet").partitionBy("day").mode(SaveMode.Overwrite).save("/root/clean")//mode(SaveMode.Overwrite)覆盖已经存在的文件 存储为parquet格式,按day分区
//存储为parquet格式,按day分区
/**
* 调优点:
* 1) 控制文件输出的大小: coalesce
* 2) 分区字段的数据类型调整:spark.sql.sources.partitionColumnTypeInference.enabled
* 3) 批量插入数据库数据,提交使用batch操作
*/
spark.stop()
}
}
过滤好的数据将被存放在/root/clean文件夹中,这部分已被执行好,后面直接使用就可以,其中代码开始的SetLogger功能在自定义类com.imooc.log.SparkStatFormatJob
中,它关闭了大部分log日志输出,这样可以使界面变得简洁,代码如下:
def SetLogger() = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("com").setLevel(Level.OFF)
System.setProperty("spark.ui.showConsoleProgress", "false")
Logger.getRootLogger().setLevel(Level.OFF);
}
过滤中的AccessConvertUtil
类内容如下所示:
object AccessConvertUtil {
//定义的输出字段
val struct = StructType( //过滤日志结构
Array(
StructField("url", StringType), //课程URL
StructField("cmsType", StringType), //课程类型:video / article
StructField("cmsId", LongType), //课程编号
StructField("traffic", LongType), //耗费流量
StructField("ip", StringType), //ip信息
StructField("city", StringType), //所在城市
StructField("time", StringType), //访问时间
StructField("day", StringType) //分区字段,天
)
)
/**
* 根据输入的每一行信息转换成输出的样式
* 日志样例:2017-05-11 14:09:14 http://www.imooc.com/video/4500 304 218.75.35.226
*/
def parseLog(log: String) = {
try {
val splits = log.split("\t")
val url = splits(1)
//http://www.imooc.com/video/4500
val traffic = splits(2).toLong
val ip = splits(3)
val domain = "http://www.imooc.com/"
//主域名
val cms = url.substring(url.indexOf(domain) + domain.length) //建立一个url的子字符串,它将从domain出现时的位置加domain的长度的位置开始计起
val cmsTypeId = cms.split("/")
var cmsType = ""
var cmsId = 0L
if (cmsTypeId.length > 1) {
cmsType = cmsTypeId(0)
cmsId = cmsTypeId(1).toLong
} //以"/"分隔开后,就相当于分开了课程格式和id,以http://www.imooc.com/video/4500为例,此时cmsType=video,cmsId=4500
val city = IpUtils.getCity(ip) //从ip表中可以知道ip对应哪个城市
val time = splits(0)
//2017-05-11 14:09:14
val day = time.split(" ")(0).replace("-", "") //day=20170511
//Row中的字段要和Struct中的字段对应
Row(url, cmsType, cmsId, traffic, ip, city, time, day)
} catch {
case e: Exception => Row(0)
}
}
def main(args: Array[String]): Unit = {
//示例程序:
val url = "http://www.imooc.com/video/4500"
val domain = "http://www.imooc.com/" //主域名
val index_0 = url.indexOf(domain)
val index_1 = index_0 + domain.length
val cms = url.substring(index_1)
val cmsTypeId = cms.split("/")
var cmsType = ""
var cmsId = 0L
if (cmsTypeId.length > 1) {
cmsType = cmsTypeId(0)
cmsId = cmsTypeId(1).toLong
}
println(cmsType + " " + cmsId)
val time = "2017-05-11 14:09:14"
val day = time.split(" ")(0).replace("-", "")
println(day)
}
}
执行完毕后clean文件夹下内容如图1所示:
日志分析
现在我们已经拥有了过滤好的日志文件,可以开始编写分析代码,例如实现一个按地市统计主站最受欢迎的TopN课程
package com.imooc.log
import com.imooc.log.SparkStatFormatJob.SetLogger
import com.imooc.log.dao.StatDAO
import com.imooc.log.entity.{DayCityVideoAccessStat, DayVideoAccessStat, DayVideoTrafficsStat}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
object TopNStatJob2 {
def main(args: Array[String]): Unit = {
SetLogger
val spark = SparkSession.builder()
.config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") //分区字段的数据类型调整【禁用】
.master("local[2]")
.config("spark.sql.parquet.compression.codec","gzip") //修改parquet压缩格式
.appName("SparkStatCleanJob").getOrCreate()
//读取清洗过后的数据
val cleanDF = spark.read.format("parquet").load("/root/clean")
//执行业务前先清空当天表中的数据
val day = "20170511"
import spark.implicits._
val commonDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
commonDF.cache()
StatDAO.delete(day)
cityAccessTopSata(spark, commonDF) //按地市统计主站最受欢迎的TopN课程功能
commonDF.unpersist(true) //RDD去持久化,优化内存空间
spark.stop()
}
/*
* 按地市统计主站最受欢迎的TopN课程
*/
def cityAccessTopSata(spark: SparkSession, commonDF: DataFrame): Unit = {
//------------------使用DataFrame API完成统计操作--------------------------------------------
import spark.implicits._
val cityAccessTopNDF = commonDF
.groupBy("day", "city", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc) //聚合
cityAccessTopNDF.printSchema()
cityAccessTopNDF.show(false)
//-----------Window函数在Spark SQL中的使用--------------------
val cityTop3DF = cityAccessTopNDF.select( //Top3中涉及到的列
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
.orderBy(cityAccessTopNDF("times").desc)).as("times_rank")
).filter("times_rank <= 3").orderBy($"city".desc, $"times_rank".asc) //以city为一个partition,聚合times为times_rank,过滤出前三,降序聚合city,升序聚合times_rank
cityTop3DF.show(false) //展示每个地市的Top3
//-------------------将统计结果写入数据库-------------------
try {
cityTop3DF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayCityVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
})
StatDAO.insertDayCityVideoAccessTopN(list)
})
} catch {
case e: Exception => e.printStackTrace()
}
}
其中保存统计时用到了StatDAO类的insertDayCityVideoAccessTopN()方法,这部分的说明如下:
def insertDayVideoTrafficsTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit = {
var connection: Connection = null
var pstmt: PreparedStatement = null
try {
connection = MySQLUtils.getConnection() //JDBC连接MySQL
connection.setAutoCommit(false) //设置手动提交
//向day_video_traffics_topn_stat表中插入数据
val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values(?,?,?)"
pstmt = connection.prepareStatement(sql)
for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setLong(3, ele.traffics)
pstmt.addBatch() //优化点:批量插入数据库数据,提交使用batch操作
}
pstmt.executeBatch() //执行批量处理
connection.commit() //手工提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt) //释放连接
}
}
JDBC连接MySQL和释放连接用到了MySQLUtils中的方法
此外我们还需要在MySQL中插入表,用来写入统计数据,MySQL表已经设置好。
下面将程序和所有依赖打包,用spark-submit提交:
./spark-submit --class com.imooc.log.TopNStatJob2 --master spark://localhost:9000 /root/jar-files/sql-1.0-jar-with-dependencies.jar
执行结果:
Schema信息
TopN课程信息
各地区Top3课程信息
MySQL表中数据:
来源:https://blog.csdn.net/weixin_44018458/article/details/128819305
猜你喜欢
- 一、maven引入依赖,数据库驱动根据项目需求自行引入<!-- https://mvnrepository.com/artifact/
- Pom依赖<parent> <groupId>org.springframework.bo
- 使用场景EntityListeners在jpa中使用,如果你是mybatis是不可以用的它的意义对实体属性变化的跟踪,它提供了保存前,保存后
- 本文实例讲述了Java设计模式之抽象工厂模式。分享给大家供大家参考,具体如下:具体工厂类:生产创建某一类具体产品对象。抽象产品类可以使用接口
- 题目:给定一个如下图所示的数字三角形,从顶部出发,在每一结点可以选择移动至其左下方的结点或移动至其右下方的结点,一直走到底层,要求找出一条路
- 什么是命名查询? Hibernate允许在映射文件中定义字符串形式的查询语句,这种查询方式成为命名查询 使用命名查询有什么好处? 由于使用H
- 1.更新同步方式:/** * 三个参数 * the path of the node
- Java读取txt文件内容。可以作如下理解:首先获得一个文件句柄。File file = new File(); file即为文件句柄。两人
- 日期和时间格式由 日期和时间模式字符串 指定。在 日期和时间模式字符串 中,未加引号的字母 'A' 到 'Z'
- 简介方案对比本处列举表示类型或状态的常用方法的对比。法1:使用数字表示(不推荐)//1:支付宝支付;2:微信支付;3:银行卡支付privat
- 本文实例讲述了Java链表(Linked List)基本原理与实现方法。分享给大家供大家参考,具体如下:在分析链表之前,我们先来对之前的动态
- android线程消息机制主要由Handler,Looper,Message和MessageQuene四个部分组成。平常在开发中,我们常用来
- 一. switch分支结构1. 简介switch结合case,能够判断一个变量或表达式与一系列值中的某个值是否相等,这里的每个值都被称为一个
- 了解JVM内存结构的目的在Java的开发过程中,因为有JVM自动内存管理机制,不再需要像在C、C++开发那样手动释放对象的内存空间,不容易出
- 最近开发项目中,有个在屏幕上任意拖动的悬浮窗功能,其实就是利用 WindowManager的api来完成这个需求,具体的实现的功能如下:1.
- Java实现按行读取大文件String file = "F:" + File.separator + "a.t
- 1. dip: device independent pixels(设备独立像素). 不同设备有不同的显示效果,这个和设备硬件有关,一般我们
- Web UI项目中, 很多 Spring controller 视图函数直接返回 html 页面, 还有一些视图函数是要重定向或转发到其他的
- Java获取环境变量Java 获取环境变量的方式很简单: System.getEnv() 得到所有的环境变量Syste
- SimpleDateFormat进行日期格式化1.为啥要用SimpleDateFormat众所周知,Java中的日期类是Date,然后日期默