Spark 入门实操

背景

内网服务器的 spark 又跑不动了... 所以在阿里云上买两台机器, 严肃点的部署一下. 顺便把之前的 pyspark 脚本也迁移到 scala 上,可以利用起我们 java 下的资源.

部署环境

  • 机器阿里云 2 台 4 核 16g
  • 依赖软件
    • ansible-2.0
    • spark-2.1
    • jdk-1.8

部署过程

为了后续添加 slave 方便,在 ansible 的脚本上花了很大的功夫。根据 master 以及 slave 的 inventory 配置

  • 自动配置 authorized_keys
  • 自动配置 master 的 ssh_config
  • 自动 配置 nfs, 以及 mount master 的工作区目录

具体 ansible 脚本的执行步骤大致如下:

  • 安装 jdk
  • 安装 spark
  • 生成以及拷贝 spark 的配置文件
    conf/slaves
    配置 各个 slave 的 ssh 的别名
    conf/spark-defaults.conf
    配置 master 的 url
    conf/spark-env.sh
    配置 JAVA_HOME, SPARK_HOME, 各类 MEMORY
  • 配置 master 的 ssh_key, 以及添加到 slave 的 authorized_keys 中
    • spark 的 start-all.sh 中通过 ssh 来启动所有的 slave 的 worker
  • 配置 nfs 共享工作区
    • 运行模式下需要所有的 worker 都能根据地址访问到所要执行的 jar

发布方式

  • 打包 sbt assembly
  • 上传 scp $WORKING_DIR/target/*.jar spark:/home/spark/workspace/

执行方式

crontab 定期调度

$SPARK_HOME/bin/spark-submit --class *** /home/spark/workspace/***

App 示例

val idsRDD = odpsOps.readTable(project, table, pr, read, numPartitions)
  .filter(_.time.isAfter(start)) // 保留最近一个月的访问记录
  .map(r => ((r.dvid, r.time), r.id)) //转换成设备号,访问日期,ID 的格式
  .groupBy(_._1).map(_._2.map(_._2)).map(_.toSet) //根据设备号和访问日期聚合,且仅保留 ID 的信息

val weightRdd = generate(idsRDD)

def listToPair[T](ls:List[T]):List[(T,T)] = ls match {
  case Nil => List()
  case a::ls => ls.map((a,_)) ++ listToPair(ls)
}

def sort(p:(Int,Int)) : (Int,Int) = if (p._1 < p._2) p else p.swap

def toScore(elementNums:Map[Int,Long], pairNums:((Int,Int),Int)) : ((Int,Int), Double) = {
  val ((a,b),n) = pairNums
  ((a,b), n / Math.sqrt(1.0*elementNums(a)*elementNums(b)))
}

def generate(ls:RDD[Set[Int]]) : RDD[((Int,Int),Double)] = {
  val flattened = ls.flatMap(s=>listToPair(s.toList))
  val elementNumbers = ls.flatMap(_.toList).countByValue()
  flattened.groupBy(sort).mapValues(_.size)
    .map(r=>toScore(elementNumbers,r))
    .flatMap(withReverse)
}

def withReverse(res: ((Int,Int), Double)) = {
  val ((activityId, relatedId), weight) = res
  Seq(res, ((relatedId, activityId),weight))
}

执行结果

同样的功能,由于

  • 数据源切换到了阿里云的 odps(内网带宽千兆可以跑满)
  • 减少了 nginx 日志的解析工作(odps 里可以相对结构化的存储信息)

原本 2 小时的执行任务,现在 4 分钟就能搞定...