· 5 years ago · Mar 02, 2021, 03:50 AM
1import { parseAsync, transforms as transformations } from 'json2csv'
2import {
3 getFeatureDoc,
4 getChannelDoc,
5} from '@lumiere/shared/utils/firestoreHelpers'
6import { CommentDataExport } from '@lumiere/shared/types'
7// import adminDB from '@lumiere/shared/services/adminDB'
8import {
9 Indices,
10 getDataWithScrollAPI,
11} from '@lumiere/shared/services/elasticsearch'
12import batchAsyncOps from '@lumiere/shared/utils/batchAsyncOps'
13
14interface ChannelData {
15 cid?: string
16 channelTitle?: string
17}
18
19let channelsMap = new Map<string, ChannelData>()
20const getChannelNameAndId = async (cid: string | null) => {
21 if (!cid) return {}
22 else {
23 if (channelsMap.has(cid)) {
24 return channelsMap.get(cid)
25 }
26
27 const channel = await getChannelDoc(cid) //adminDB.channel(cid).get()
28 if (channel) {
29 channelsMap.set(cid, { channelTitle: channel.title, cid })
30 }
31 return channelsMap.get(cid) ?? {}
32 }
33}
34
35const extractSentiment = (sentiment: any) => {
36 return sentiment?.[0] ?? sentiment
37}
38
39const PER_BATCH = 10
40
41/**
42 * Handler to fetch comment data from elasticsearch
43 */
44export async function fetchVideoExternalDataEntitiesDataExport(
45 vid: string,
46 fid: string,
47) {
48 // get feature
49
50 const queryParams = {
51 bool: {
52 filter: [
53 { term: { 'asset.video.keyword': vid } },
54 { term: { 'data.fid.keyword': fid } },
55 { term: { 'data.type.keyword': 'external.data' } },
56 ],
57 },
58 }
59
60 // get feature
61 const feature = await getFeatureDoc(vid, fid)
62 const featureName = feature?.label
63
64 // query elasticsearch scroll API
65 const hits = await getDataWithScrollAPI({
66 index: Indices.external_data,
67 body: { query: queryParams, track_total_hits: true },
68 filter_path: 'hits',
69 size: 20000,
70 })
71
72 // let entities
73 if (hits) {
74 let results: CommentDataExport[] = []
75
76 // STARTING HERE
77 await batchAsyncOps(
78 hits,
79 (hit) =>
80 (async function (hit) {
81 const { _source, _id } = hit
82 // extract relevant fields from Elasticsearch JSON data
83 // 1. remove entities with no relevance
84 const entities = (
85 _source.data.payload.comment?.entities ?? []
86 ).filter((ent: { [key: string]: any }) => {
87 return !!ent.name && !!ent.type
88 })
89
90 // 2. Get the channel id and title if data was collected in channel context
91 const channelData = await getChannelNameAndId(_source.asset.channel)
92
93 // 3. reconstruct json data and push to the array
94 results.push({
95 id: _id,
96 createdOn: _source.createdOn,
97 userId: _source.userId,
98 eid: _source.eid,
99 vid: _source.asset.video ?? vid,
100 ...channelData,
101 fid: _source.data.fid,
102 featureName,
103 viewId: _source.id,
104 time: _source.data.payload.currentTime,
105 submittedTime: _source.data.payload.submittedTime,
106 text: _source.data.payload.comment.text,
107 sentiment: extractSentiment(_source.data.payload.comment.sentiment),
108 sentences:
109 extractSentiment(_source.data.payload.comment.sentiment)
110 ?.sentences ?? [],
111 entities: entities,
112 rating: _source.data.payload.comment.plus?.rating,
113 })
114
115 return results
116 })(hit),
117 PER_BATCH,
118 )
119
120 // ENDING HERE
121
122 results = results.filter(
123 (a_row: CommentDataExport) => !!a_row.entities?.length,
124 )
125
126 const opts = entitiesExportFields()
127
128 return parseAsync(results, opts)
129 .then((csv) => ({ csv, json: results }))
130 .catch((err) => {
131 console.error({ err })
132 throw err
133 })
134 }
135
136 return Promise.resolve({ csv: '', json: [] })
137}
138
139const entitiesExportFields = () => {
140 const fields = [
141 {
142 label: 'commentId',
143 value: 'id',
144 },
145 { label: 'name', value: 'entities.name' },
146 { label: 'type', value: 'entities.type' },
147 { label: 'salience', value: 'entities.salience' },
148 { label: 'sentimentScore', value: 'entities.sentiment.score' },
149 { label: 'sentimentMagnitude', value: 'entities.sentiment.magnitude' },
150 ]
151
152 const { unwind } = transformations
153 const transforms = [unwind({ paths: ['entities'] })]
154 return { fields, transforms }
155}
156