· 8 years ago · Dec 22, 2017, 06:48 AM
1package com.example.parquet.writing
2
3import java.lang.Exception
4import java.util
5
6import org.apache.hadoop.conf.Configuration
7import org.apache.parquet.hadoop.ParquetWriter
8import org.apache.parquet.hadoop.metadata.CompressionCodecName
9import org.apache.hadoop.fs.Path
10import java.util.{Date, UUID}
11
12
13import scala.collection.JavaConversions._
14import com.datastax.driver.core.{ColumnDefinitions, DataType}
15import com.trax.platform.shared.Cassandra
16import com.typesafe.scalalogging.LazyLogging
17import org.apache.parquet.column.ParquetProperties.WriterVersion
18import org.apache.parquet.hadoop.api.WriteSupport
19import org.apache.parquet.io.ParquetEncodingException
20import org.apache.parquet.io.api.RecordConsumer
21import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
22import org.apache.parquet.schema.{MessageType}
23
24import scala.util.Try
25
26/**
27 *
28create external table qv_015a8671_a4b0_510f_b6d2_a8cb2f5b66fd (
29 appname string,
30 email string,
31 groupname string,
32 grouporgid string,
33 grouporgname string,
34 homeorgid string,
35 homeorgname string,
36 id int,
37 isadmin int,
38 isgrouphomeorg int,
39 istraxuser int,
40 legacy_id string,
41 name string
42)
43STORED AS PARQUET
44LOCATION 's3://trax-spark-dev/qv_015a8671_a4b0_510f_b6d2_a8cb2f5b66fd'
45tblproperties ("parquet.compress"="GZIP");
46 */
47
48object Main extends LazyLogging {
49 private val NumberOfRecords = 100 * 1000
50 private val rnd = new scala.util.Random
51
52
53
54 def createS3ParquetWriter(path: String, accessKey: String, secretKey: String, table:String, row: ColumnDefinitions, compressionCodecName: CompressionCodecName, blockSize: Int, pageSize: Int) = {
55 import org.apache.parquet.schema.MessageTypeParser
56
57
58 val schemaString = s"message $table {\n " + row.map { row =>
59 row.getType.getName match {
60 case DataType.Name.VARCHAR | DataType.Name.TEXT | DataType.Name.ASCII => s"OPTIONAL binary ${row.getName} (UTF8)"
61 case DataType.Name.DECIMAL => s"OPTIONAL DOUBLE ${row.getName}"
62 case DataType.Name.BOOLEAN => s"OPTIONAL BOOLEAN ${row.getName}"
63 case DataType.Name.INT => s"OPTIONAL int32 ${row.getName}"
64 case DataType.Name.FLOAT => s"OPTIONAL FLOAT ${row.getName}"
65 case DataType.Name.DOUBLE => s"OPTIONAL DOUBLE ${row.getName}"
66 case DataType.Name.BIGINT => s"OPTIONAL int64 ${row.getName}"
67 case DataType.Name.COUNTER => s"OPTIONAL int64 ${row.getName}"
68 case DataType.Name.TIMESTAMP => s"OPTIONAL int64 ${row.getName} (TIMESTAMP_MILLIS)"
69 case DataType.Name.BLOB => s"OPTIONAL binary ${row.getName}"
70 case DataType.Name.VARINT => s"OPTIONAL int64 ${row.getName}"
71 case DataType.Name.UUID | DataType.Name.TIMEUUID => s"OPTIONAL binary ${row.getName} (UTF8)"
72 case DataType.Name.DATE => s"OPTIONAL int64 ${row.getName} (DATE)"
73 case _ => "Unsupported parquet column type: " + row.getType.getName
74 }
75
76 }.mkString(";\n") + ";\n}"
77 System.out.println(schemaString)
78 //val schemaString = new StringBuilder("message sample { required binary city; }")
79 val schema = MessageTypeParser.parseMessageType(schemaString)
80 val writeSupport = new CassandraWriteSupport(schema)
81
82
83 val outputPath = new Path(path)
84
85 val conf = new Configuration
86 conf.set("fs.s3a.access.key", accessKey)
87 conf.set("fs.s3a.secret.key", secretKey)
88
89 new ParquetWriter[Map[String, Any]](outputPath,
90 writeSupport, compressionCodecName, blockSize, pageSize, pageSize,
91 ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
92 ParquetWriter.DEFAULT_WRITER_VERSION,
93 conf
94 )
95 }
96
97 class CassandraWriteSupport(schema: MessageType) extends WriteSupport[Map[String, Any]] {
98
99 import PrimitiveTypeName._
100
101 var recordConsumer: RecordConsumer = null
102
103 override def init(configuration: Configuration): WriteSupport.WriteContext = {
104 new WriteSupport.WriteContext(schema, new util.HashMap[String, String]())
105 }
106
107 override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
108 this.recordConsumer = recordConsumer
109 }
110
111 import org.apache.parquet.io.api.Binary
112
113 def stringToBinary(value: Any) = Binary.fromReusedByteArray(value.toString.getBytes)
114
115 override def write(record: Map[String, Any]): Unit = {
116 recordConsumer.startMessage
117
118 var i = 0
119 schema.getColumns.foreach { desc =>
120 val theType = desc.getType
121 record.get(desc.getPath()(0)).foreach { v =>
122 try {
123 if (v != null) {
124 recordConsumer.startField(desc.getPath()(0), i)
125 theType match {
126 case BOOLEAN => recordConsumer.addBoolean(v.asInstanceOf[Boolean])
127 case BINARY => recordConsumer.addBinary(stringToBinary(v))
128 case INT32 => recordConsumer.addInteger(v.asInstanceOf[Int])
129 case DOUBLE =>
130 v match {
131 case x: java.math.BigDecimal => recordConsumer.addDouble(v.asInstanceOf[java.math.BigDecimal].doubleValue())
132 case x: Double => recordConsumer.addDouble(v.asInstanceOf[Double])
133 }
134 case FLOAT => recordConsumer.addFloat(v.asInstanceOf[Float])
135 case INT64 =>
136 v match {
137 case x: java.math.BigDecimal => recordConsumer.addDouble(v.asInstanceOf[java.math.BigDecimal].doubleValue())
138 case x: Date => recordConsumer.addLong(v.asInstanceOf[Date].getTime)
139 case x: Long => recordConsumer.addLong(v.asInstanceOf[Long])
140 }
141 case _ => throw new ParquetEncodingException(
142 "Unsupported column type: " + v.getClass.getName);
143 }
144 recordConsumer.endField(desc.getPath()(0), i)
145 }
146
147 } catch {
148 case e: Exception =>
149 val value = Option(v).getOrElse("null")
150 System.out.println(s"failed on col:${desc.getPath()(0)} type:$theType value:${value.toString}: " + e.getMessage)
151 recordConsumer.endField(desc.getPath()(0), i)
152 throw e
153 }
154 }
155 i = i + 1
156
157 }
158 recordConsumer.endMessage
159 }
160 }
161
162
163 private def writeToS3(path: String, accessKey: String, secretKey: String, casUser:String, casPwd:String,
164 seeds:Seq[String],keyspace:String, table:String,
165 compressionCodecName: CompressionCodecName, blockSize: Int, pageSize: Int) = {
166 System.out.println("starting...")
167
168 val session = Cassandra.session(keyspace, seeds, None, Option(casUser), Option(casPwd))
169 val stmt = s"select * from $table"
170 System.out.println("connected...")
171
172 var parquetWriter: ParquetWriter[Map[String, Any]] = null
173 var i = 0
174 val rs = session.execute(stmt)
175 rs.iterator().foreach { row =>
176
177 if (i == 0) {
178 parquetWriter = createS3ParquetWriter(path, accessKey, secretKey, table, row.getColumnDefinitions,
179 compressionCodecName, blockSize, pageSize)
180 }
181 i = i + 1
182
183 val data: Map[String, Any] =
184 row.getColumnDefinitions
185 .filter(cdef => !row.isNull(cdef.getName))
186 .map { cdef =>
187 val colName = cdef.getName
188 cdef.getType.getName match {
189 case DataType.Name.VARCHAR | DataType.Name.TEXT | DataType.Name.ASCII =>
190 colName -> row.getString(cdef.getName)
191 case DataType.Name.DECIMAL =>
192 colName -> row.getDecimal(cdef.getName)
193 case DataType.Name.BOOLEAN =>
194 colName -> row.getBool(cdef.getName)
195 case DataType.Name.INT =>
196 colName -> row.getInt(cdef.getName)
197 case DataType.Name.FLOAT =>
198 colName -> row.getFloat(cdef.getName)
199 case DataType.Name.DOUBLE =>
200 colName -> row.getDouble(cdef.getName)
201 case DataType.Name.BIGINT =>
202 colName -> row.getLong(cdef.getName)
203 case DataType.Name.COUNTER =>
204 colName -> row.getLong(cdef.getName)
205 case DataType.Name.TIMESTAMP =>
206 colName -> row.getTimestamp(cdef.getName)
207 case DataType.Name.BLOB =>
208 colName -> row.getString(cdef.getName)
209 case DataType.Name.VARINT =>
210 colName -> row.getVarint(cdef.getName)
211 case DataType.Name.UUID | DataType.Name.TIMEUUID =>
212 colName -> Option(row.getUUID(cdef.getName)).fold(null: String)(_.toString)
213 case DataType.Name.DATE => colName ->
214 Try(new Date(row.getDate(cdef.getName).getMillisSinceEpoch)).toOption.orNull
215 case _ => colName ->
216 row.getString(cdef.getName)
217 }
218 }.toMap
219
220 parquetWriter.write(data)
221 if (1 % 1000 == 0) System.out.println(s"wrote ${i+1} rows")
222 }
223 if (parquetWriter != null) parquetWriter.close()
224 }
225
226
227 def main(args: Array[String]) {
228 val accessKey = "ABC"
229 val secretKey = "*"
230 //val table = "qv_efafa77b_dd46_5bf9_87f0_240b715f0797"
231 val qv_id = "2deadfea-4367-5ba9-9499-a61a044d10a5"
232 val table = "qv_" + qv_id.replace("-","_")
233 val s3Path = s"s3a://trax-spark-dev/test/$qv_id"
234
235 val keyspace = "quickviews"
236 val seeds = Seq("172.24.0.7", "172.24.0.8", "172.24.0.9")
237 val casUser = "johnbush"
238 val casPwd = "*"
239 val compressionCodecName = CompressionCodecName.SNAPPY
240 //val compressionCodecName = CompressionCodecName.GZIP
241
242 val blockSize = 256 * 1024 * 1024
243 val pageSize = 1 * 1024 * 1024
244
245 val start = new Date( ).getTime
246
247 try {
248 if (s3Path != null) {
249 writeToS3(s3Path, accessKey, secretKey, casUser, casPwd, seeds, keyspace, table, compressionCodecName, blockSize, pageSize)
250 } else {
251 println("S3_PATH is empty")
252 }
253 } finally {
254 Cassandra.close()
255 }
256 val end = new Date().getTime
257 println(s"took ${end - start} ms to load table:$qv_id to $s3Path")
258 }
259}