-
Notifications
You must be signed in to change notification settings - Fork 0
/
电商平台开发文档
395 lines (242 loc) · 15.7 KB
/
电商平台开发文档
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
电商信息平台 开发
hive 表结构分析
2.1 user_visit_action表
/**
* 用户访问动作表
*
* @param date 用户点击行为的日期
* @param user_id 用户的ID
* @param session_id Session的ID
* @param page_id 某个页面的ID
* @param action_time 点击行为的时间点
* @param search_keyword 用户搜索的关键词
* @param click_category_id 某一个商品品类的ID
* @param click_product_id 某一个商品的ID
* @param order_category_ids 一次订单中所有品类的ID集合
* @param order_product_ids 一次订单中所有商品的ID集合
* @param pay_category_ids 一次支付中所有品类的ID集合
* @param pay_product_ids 一次支付中所有商品的ID集合
* @param city_id 城市ID
*/
1. 点击Session
2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,3,2018-02-11 17:04:42,null,37,17,null,null,null,null,7
搜索session
2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,3,2018-02-11 17:29:50,重庆小面,-1,-1,null,null,null,null,1
需求一
需求一:各个范围Session步长、访问时长占比统计
访问时长:session的最早时间与最晚时间之差。
访问步长:session中的action个数。
统计出符合筛选条件的session中,访问时长在1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m,访问步长在1_3、4_6、…以上各个范围内的各种session的占比。
访问时长在1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m 的个数 m 在每个session中统计 (startTime - stopTime)
访问步长在1_3、4_6、…以上各个范围内的各种session的占比。 可以在每个session中统计 一个session 出现多少次 就对应多少个步长
1.找出所有时间条件符合条件的session
从表里面查询 满足时间的 并转换成RDD
actionRDD
要另外的几个条件过滤 因为另外几个条件在userinfo 表里面 session 表里面只有userid 所有 后面必须要join 联立
而且过滤 涉及到
keywords:"", \
categoryIds:"", \
查询的 关键词
分类ids
UserVisitAction(2019-06-10,14,70611e5c0be84bb89d2f209fcaf2800c,7,2019-06-10 18:31:57,null,51,42,null,null,null,null,7)
UserVisitAction(2019-06-10,14,70611e5c0be84bb89d2f209fcaf2800c,5,2019-06-10 18:56:43,null,-1,-1,null,null,2,97,2)
UserVisitAction(2019-06-10,14,70611e5c0be84bb89d2f209fcaf2800c,7,2019-06-10 18:35:09,null,-1,-1,64,10,null,null,3)
sessionId2GroupRDD 按 sessionId 聚合 (sessionId , iterable)
sessionId2FullInfo = sessionId2GroupRDD .map (userId -> fullInfo )
sessionId2FullInfo = userId2AggrInfoRDD join (useriD,userInfo)
(51,sessionid=6c2b79abeb5443f580039b2c292b71b4|searchKeywords=卫生纸,机器学习,华为手机,小龙虾,保温杯,联想笔记本,吸尘器,洗面奶,苹果|clickCategoryIds=9,0,36,36,71,52,81,94,47,46,6,87,90,42,98,60,90,9,81,2|visitLength=600|stepLength=90|startTime=2019-06-10 13:00:03)
sessionId2FullInfo =
(de154b697eca433ab12ecf8dba58ac36,sessionid=de154b697eca433ab12ecf8dba58ac36|searchKeywords=华为手机,苹果,Lamer,小龙虾,洗面奶|clickCategoryIds=0,26,61,38,29|visitLength=1062|stepLength=19|startTime=2019-06-10 09:02:54|age=32|professional=professional96|sex=female|city=city84)
(d4b3f4146d40424b9012d20bc6d56595,sessionid=d4b3f4146d40424b9012d20bc6d56595|searchKeywords=|clickCategoryIds=11,77,1|visitLength=2382|stepLength=6|startTime=2019-06-10 02:15:14|age=32|professional=professional96|sex=female|city=city84)
累加器 经过filterDate 以后
k = 30_60, value = 152
k = 60, value = 229
k = 1m_3m, value = 18
k = 3m_10m, value = 72
k = 10m_30m, value = 196
k = 7_9, value = 11
k = session_count, value = 540
k = 4_6, value = 13
k = 30s_60s, value = 6
k = 1_3, value = 15
k = 10_30, value = 116
k = 30m, value = 244
k = 7s_9s, value = 1
k = 10s_30s, value = 2
需求二:Session随机抽取
做什么
明确一共要抽取多少session 100
明确每天要抽取多少session extractPerDay
明确每天有多少session val dateSessionCount = date2HourCountMap.values.sum
明确每小时有多少session count
明确每小时抽取多少session var hourExrCount = ((count / dateSessionCount.toDouble) * extractPerDay).toInt
根据每小时抽取数量生成随机索引 generateRandomIndexList
按照随机索引抽取实际的一个小时中的session
怎么做
session2FullinfoRDD (sessionId,fullinfo) == map ==> (dateHour,info) == countByKey ==> Map(dateHour,count) == map ==>(date,(hour,count)) ==map==>(date(hour,list))
==> 抽取结果 ==>写入mysql
//抽取数据
// date2HourCountMap: Map[(date, Map[(hour, List)])]
// dateHour2FullInfo (dateHour,fullinfo)
(dateHour,iterable) => 这里就可以抽取了
val dateHour2GroupRDD = dateHour2FullInfo.groupByKey()
/**
* Session随机抽取表
*
* @param taskid 当前计算批次的ID
* @param sessionid 抽取的Session的ID
* @param startTime Session的开始时间
* @param searchKeywords Session的查询字段
* @param clickCategoryIds Session点击的类别id集合
*/
需求三
在符合条件的session中,获取点击、下单和支付数量排名前10的品类。在Top10的排序中,按照点击数量、下单数量、支付数量的次序进行排序,即优先考虑点击数量。
case class Top10Category(taskid:String,
categoryid:Long,
clickCount:Long,
orderCount:Long,
payCount:Long)
怎么做:
获取所有发生过点击、下单、付款的品类
获取各个categoryId的点击次数
获取各个categoryId的下单次数
获取各个categoryId的付款次数
先统计所有categoryId 的所有各种行为次数 (categoryId,num)
在与符合条件的过滤之后的(categoryId,categoryId) left join 操作
(20,categoryid=20|clickCount=71|orderCount=67|payCount=70)
(12,categoryid=12|clickCount=55|orderCount=70|payCount=72)
根据点击、下单、付款次数排序
取排序后Top10的品类
整理结构,写入MySQL数据库
// sessionId2ActionRDD: RDD[(sessionId, action)]
// sessionId2FilterRDD : RDD[(sessionId, FullInfo)] 符合过滤条件的
// sessionId2FilterActionRDD: join
1 sessionId2Action join sessionId2Fullinto
可以得到 符合条件的 sessionId2FilterActionRDD
(SortKey(85,76,64),categoryid=65|clickCount=85|orderCount=76|payCount=64)
(SortKey(85,63,65),categoryid=84|clickCount=85|orderCount=63|payCount=65)
(SortKey(85,57,63),categoryid=83|clickCount=85|orderCount=57|payCount=63)
(SortKey(84,83,71),categoryid=86|clickCount=84|orderCount=83|payCount=71)
需求四
做什么
统计需求三中得到的Top10热门品类中的Top10活跃Session,对Top热门品类中的每个品类都取Top10活跃Session。
怎么 做
符合过滤条件的用户行为数据
filter or join => 点击过Top10品类的所有Session
Top10热门品类
点击过Top10品类的所有Session
每个session对Top10品类的点击次数
取点击次数处于Top10的session
需求五
需求五:页面单跳转化率统计
做什么
我们需要去分析每一次的页面访问流程,也就是用户从进入网站到离开网站这个流程中所访问的页面顺序,也就是一个session中的页面访问顺序。
假如一个session的页面访问顺序为1,2,3,4,5,那么他访问的页面切片就是1_2,2_3,3_4,4_5,如果得到所有session的切面切片m_n,就可以计算每一种页面切片的个数,即count(m_n),就可以计算每一个页面单跳转化率。
那么如何获得一个session的访问顺序?
怎么做
第一 得到 页面流的路劲
第二 转化为页面切片
第三 用户的访问行为(sessionId,action)
第四 用户的访问行为聚合 (sessionId,iterableAction)
第五 对action 中的action_time 排序
第五 map ==> (page)
第五 map ==> (pageSplit)
第七 过滤 ==>(realSplit)
第六 map == >(realSplit,1)
第七 聚合 (realSplit,m)
需求五
做什么:
统计各个区域中Top3的热门商品。热门商品的评判指标是商品被点击的次数,对于user_visit_action表,click_product_id表示被点击的商品。
怎么做:
用户访问行为表 -----> city_id click_product_id
------>city_id city_name area click_product_id | 0| 北京| 华北| 95|
| 0| 北京| 华北| 7|
城市信息 -----> city_id city_name area
↓
city_infos area click_product_id click_count
↓ 商品信息表
area city_infos click_product_id click_count product_name product_status
+----+----------+---+------------+--------------+-----------+
|area|city_infos|pid|product_name|product_status|click_count|
+----+----------+---+------------+--------------+-----------+
| 西北| 7:西安| 43| product43| Third Party| 9|
| 西南| 8:成都| 70| product70| Self| 17|
| 东北| 9:哈尔滨| 77| product77| Self| 14|
| 华中| 5:武汉,6:长沙| 40| product40| Self| 31|
| 华中| 5:武汉,6:长沙| 57| product57| Third Party| 23|
| 西南| 8:成都| 62| product62| Self| 7|
| 华东| 1:上海,2:南京| 73| product73| Third Party| 32|
| 西南| 8:成都| 65| product65| Self| 18|
| 华中| 5:武汉,6:长沙| 0| product0| Third Party| 36|
| 华北| 0:北京| 50| product50| Third Party| 13|
| 华南| 3:广州,4:三亚| 45| product45| Self| 35|
| 东北| 9:哈尔滨| 52| product52| Self| 12|
| 华东| 1:上海,2:南京| 79| product79| Third Party| 28|
select area , pid, group_concat_distinct(concat_long_string(city_id,city_name,':')) city_infos, count(pid) from tmp_area_basic_info group by area,pid
udaf 聚合 函数 相当于 每一行都会传进去 进行运算
需求六 广告点击量实时统计
数据格式 timestamp province city userid adid
kafka 不断消费过来数据
先把DStram 到黑名单列表 里面去过滤一下 过滤掉在黑名单里面的数据
// 根据每一个RDD里面的数据,更新用户点击次数表
// 更新之后 反查出来看看是否大于100 把大于的存下来
// 加入黑名单
代码中 几个思考的问题
一、为什么要cache?
Spark计算框架的一大优势就是对迭代计算的强大支持。由于spark中的RDD都是只读不可变的对象,也就是RDD的每一个transformation操作都会产生一个新的RDD。所以Spark任务中的一个优化原则就是避免创建重复的RDD而尽量复用同一个RDD。
关键 就是这一句 : 从而根据action操作需要的RDD及其依赖的所有RDD转换操作形成实际的任务。也就是会从源头输入数据开始执行整个计算过程
如果像编写单机程序一样,以为复用RDD只需要在不同的迭代计算中引用同一个RDD即可,在查看spark UI中的任务日志时会发现同一份输入数据可能被多次重复读取。这与spark的RDD计算原理有关:spark中一个job是由RDD的一连串transformation操作和一个action操作组成。只有当执行到action操作代码时才会触发生成真正的job,从而根据action操作需要的RDD及其依赖的所有RDD转换操作形成实际的任务。也就是会从源头输入数据开始执行整个计算过程
Scala Option(选项)类型用来表示一个值是可选的(有值或无值)。
Option[T] 是一个类型为 T 的可选值的容器: 如果值存在, Option[T] 就是一个 Some[T] ,如果不存在, Option[T] 就是对象 None 。
使用累加器也是 如果是在一个方法里面 那么这个方法 务必要cache
获取 参数 在此项目中有两种方法 一种是 val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)
第二种 val startDate = taskParam.get(Constants.PARAM_START_DATE)
统计出 所有在session的个数 n
sc.bordcast
广播 的作用是 提升性能 分发给每一个worker
zip 拉链 多余的会直接过滤掉
tail 除了第一个后面全是
DStream :
无状态转换操作实例:我们之前“套接字流”部分介绍的词频统计,就是采用无状态转换,每次统计,都是只统计当前批次到达的单词的词频,和之前批次无关,不会进行累计。
无状态 转换
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如, reduceByKey() 会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
map
这个是对RDD 里面的每个元素 例如RDD[String]
String 做操作
最后得到一个人新的RDD
filter
def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {}
这个是对RDD 里面的每个元素 例如RDD[String]
做操作 一样的
reduceByKey
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative and commutative reduce function. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
这个也只是 针对每个RDD 操作 即不是全局的,
transform
该函数每一批次调度一次。
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
这个是对RDD 进行操作
当我们需要在跨批次之间维护状态时,就必须使用updateStateByKey操作。
updateStateByKey
4.4.2.1 追踪状态变化UpdateStateByKey
UpdateStateByKey原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
4.4.2.2 Window Operations
4.5 DStreams输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。
可以重用我们在 Spark 中实现的所有行动操作
foreachRDD
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
map /** Return a new DStream by applying a function to all elements of this DStream. */
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = ssc.withScope {