· 6 years ago · Apr 18, 2020, 09:38 PM
1package com.xxx.xxxx.sendgrid
2
3import com.merlin.dataengineering.spark.common.SparkUtils
4import scala.collection.JavaConverters._
5
6object AllEmailToJson {
7
8 case class Config(gcsPath: String = "xxxxxx",
9 apiKey: String = "xxxxx",
10 since: Long = 0)
11
12 def main(args: Array[String]): Unit = {
13
14 val parser = new scopt.OptionParser[AllEmailsToJson.Config]("Config") {
15 override def errorOnUnknownArgument = false
16 opt[String]('p', "gcs-path")
17 .action((x, c) => c.copy(gcsPath = x))
18 .text("Gcs temporary path")
19 opt[String]('k', "api-key")
20 .action((x, c) => c.copy(apiKey = x))
21 .text("Sendgrid API key")
22 opt[Long]('s', "since")
23 .action((x, c) => c.copy(since = x))
24 .text("Date in unixtime (ms) https://currentmillis.com/")
25 }
26
27 val conf: Config = parser.parse(args, AllEmailToJson.Config()) match {
28 case Some(config) => config
29 case None => Config()
30 }
31 val session = SparkUtils.createSparkSession(
32 args,
33 "EmailToBigQuery",
34 None,
35 "src/main/resources/client2.json")
36
37 val gcsPath = conf.gcsPath
38 SparkUtils.clearPath(session, gcsPath)
39
40 val since = conf.since
41
42 val client = SparkUtils.createRetroFitClient("https://api.sendgrid.com/")
43 val service = client.create(classOf[EmailService])
44 val (total, totalFiles) =
45 SparkUtils.saveToJson[EmailResponse, EmailWrapper](
46 gcsPath,
47 "emails_",
48 map => {
49 service.getEmailPaged(conf.apiKey, 250, map)
50 },
51 r => {
52 if (r != null && r.getResults != null) {
53 val items = r.getResults.asScala
54 if (since > 0) {
55 items.filter(e => e.getEmail.getLastUpdated >= since)
56 } else {
57 items
58 }
59 } else null
60
61 },
62 r => {
63 if (r.logic) {
64 Map("offset" -> r.getOffset.toString).asJava
65 } else {
66 null
67 }
68 },
69 session,
70 true
71 )
72
73 println(s"Total records: ${total}")
74 println(s"Total files: ${totalFiles}")
75
76 }
77}