· 7 years ago · Mar 16, 2018, 11:22 PM
1class EMRClient(emrClusterId:String) extends LazyLogging{
2
3 val accessKey = ...// access key
4 val secretKey = ...//secret key
5 val credentials = new BasicAWSCredentials(accessKey, secretKey)
6 val REGION = <my region>
7 println(">>>>>>>>>>>>>>>>>>>>Initializing EMR client for clusterId " + emrClusterId + " . The region is " + REGION)
8 val emr = AmazonElasticMapReduceClientBuilder
9 .standard()
10 .withCredentials(new AWSStaticCredentialsProvider(credentials))
11 .withRegion(REGION)
12 .build()
13
14def executeHQLStep(s3ScriptPath:String, stepName:String, args:String = ""): AddJobFlowStepsResult= {
15 val hqlScriptStep = buildHQLStep(hqlScriptPath, stepName, args)
16 val stepSet = new java.util.HashSet[StepConfig]()
17 //stepSet.add(enableDebugging)
18 stepSet.add(hqlScriptStep)
19 executeJobFlowSteps(stepSet)
20 }
21
22 /**
23 * Builds a StepConfig to be executed in a job flow for a given .hql file from S3
24 * @param hqlScriptPath the location in S3 of the script file containing the script to run
25 * @param args optional field for arguments for hive script.
26 * @param stepName An identifier to give to EMR to name your Step
27 * @return
28 */
29 private def buildHQLStep(hqlScriptPath:String, stepName:String, args:String= ""): StepConfig = {
30 new StepConfig()
31 .withName(stepName)
32 .withActionOnFailure(ActionOnFailure.CANCEL_AND_WAIT)
33 .withHadoopJarStep(stepFactory.newRunHiveScriptStep(hqlScriptPath, args))
34 }
35
36 private def executeJobFlowSteps(steps: java.util.Set[StepConfig]): AddJobFlowStepsResult = {
37 emr.addJobFlowSteps(new AddJobFlowStepsRequest()
38 .withJobFlowId(emrClusterId)
39 .withSteps(steps)) // where the error is thrown
40 }