Spark 入门实操
背景
内网服务器的 spark 又跑不动了... 所以在阿里云上买两台机器, 严肃点的部署一下. 顺便把之前的 pyspark 脚本也迁移到 scala 上,可以利用起我们 java 下的资源.
部署环境
- 机器阿里云 2 台 4 核 16g
- 依赖软件
- ansible-2.0
- spark-2.1
- jdk-1.8
部署过程
为了后续添加 slave 方便,在 ansible 的脚本上花了很大的功夫。根据 master 以及 slave 的 inventory 配置
- 自动配置 authorized_keys
- 自动配置 master 的 ssh_config
- 自动 配置 nfs, 以及 mount master 的工作区目录
具体 ansible 脚本的执行步骤大致如下:
- 安装 jdk
- 安装 spark
- 生成以及拷贝 spark 的配置文件
- conf/slaves
- 配置 各个 slave 的 ssh 的别名
- conf/spark-defaults.conf
- 配置 master 的 url
- conf/spark-env.sh
- 配置 JAVA_HOME, SPARK_HOME, 各类 MEMORY
- 配置 master 的 ssh_key, 以及添加到 slave 的 authorized_keys 中
- spark 的 start-all.sh 中通过 ssh 来启动所有的 slave 的 worker
- 配置 nfs 共享工作区
- 运行模式下需要所有的 worker 都能根据地址访问到所要执行的 jar
发布方式
- 打包
sbt assembly
- 上传
scp $WORKING_DIR/target/*.jar spark:/home/spark/workspace/
执行方式
crontab
定期调度
$SPARK_HOME/bin/spark-submit --class *** /home/spark/workspace/***
App 示例
val idsRDD = odpsOps.readTable(project, table, pr, read, numPartitions) .filter(_.time.isAfter(start)) // 保留最近一个月的访问记录 .map(r => ((r.dvid, r.time), r.id)) //转换成设备号,访问日期,ID 的格式 .groupBy(_._1).map(_._2.map(_._2)).map(_.toSet) //根据设备号和访问日期聚合,且仅保留 ID 的信息 val weightRdd = generate(idsRDD) def listToPair[T](ls:List[T]):List[(T,T)] = ls match { case Nil => List() case a::ls => ls.map((a,_)) ++ listToPair(ls) } def sort(p:(Int,Int)) : (Int,Int) = if (p._1 < p._2) p else p.swap def toScore(elementNums:Map[Int,Long], pairNums:((Int,Int),Int)) : ((Int,Int), Double) = { val ((a,b),n) = pairNums ((a,b), n / Math.sqrt(1.0*elementNums(a)*elementNums(b))) } def generate(ls:RDD[Set[Int]]) : RDD[((Int,Int),Double)] = { val flattened = ls.flatMap(s=>listToPair(s.toList)) val elementNumbers = ls.flatMap(_.toList).countByValue() flattened.groupBy(sort).mapValues(_.size) .map(r=>toScore(elementNumbers,r)) .flatMap(withReverse) } def withReverse(res: ((Int,Int), Double)) = { val ((activityId, relatedId), weight) = res Seq(res, ((relatedId, activityId),weight)) }
执行结果
同样的功能,由于
- 数据源切换到了阿里云的 odps(内网带宽千兆可以跑满)
- 减少了 nginx 日志的解析工作(odps 里可以相对结构化的存储信息)
原本 2 小时的执行任务,现在 4 分钟就能搞定...