· 6 years ago · Mar 11, 2020, 10:26 AM
1/*
2 * kcos-system-diagnostics-service
3 *
4 * Service for kcos system diagnostics.
5 *
6 * API version: 7ee9e638c20e7f3e821508b51d88893e292c78ba
7 * Generated by: OpenAPI Generator (https://openapi-generator.tech)
8 */
9
10package service
11
12import (
13 "archive/zip"
14 "bufio"
15 "errors"
16 "fmt"
17 "io"
18 "log"
19 "os"
20 "os/exec"
21 "sync"
22 "sync/atomic"
23 "time"
24)
25
26const (
27 log_collection_running = iota
28 log_collection_success = iota
29 log_collection_failure = iota
30)
31
32type ArchiveInfo struct {
33 filename string
34 timestamp time.Time
35 state uint8
36}
37
38type ArchiveMap map[int32]*ArchiveInfo // key represents the operation id that generated it, value represents information about the archive
39
40var lock = sync.RWMutex{}
41var counter int32
42
43func (archiveMap *ArchiveMap) AddArchive(id int32, archiveName string, currentState uint8) {
44 lock.Lock()
45 defer lock.Unlock()
46 (*archiveMap)[id] = &ArchiveInfo{
47 filename: archiveName,
48 timestamp: time.Now(),
49 state: currentState}
50}
51
52func (archiveMap *ArchiveMap) UpdateArchiveState(id int32, state uint8) {
53 lock.Lock()
54 defer lock.Unlock()
55 v, ok := (*archiveMap)[id]
56 if !ok {
57 log.Printf("UpdateArchiveState was called with id %d which is not tracked", id)
58 return
59 }
60 v.state = state
61 v.timestamp = time.Now()
62}
63
64func (archiveMap *ArchiveMap) DeleteArchive(id int32) {
65 delete(*archiveMap, id)
66}
67
68func (archiveMap *ArchiveMap) DeleteOldArchives() {
69 lock.Lock()
70 defer lock.Unlock()
71 var storageDuration time.Duration = 48 * time.Hour
72 for k, v := range *archiveMap {
73 if time.Since(v.timestamp) >= storageDuration {
74 archiveMap.DeleteArchive(k)
75 // delete file
76 err := os.Remove("/tmp" + v.filename)
77 if err != nil {
78 log.Printf("DeleteOldArchives: error while removing file %s(id %d) :%s", v.filename, k, err.Error())
79 continue
80 }
81 log.Printf("DeleteOldArchives: done deleting file %s (id %d)", v.filename, k)
82 }
83 }
84}
85
86func ScheduleCleanupTimer(cleanupFunction func(), delay time.Duration) chan bool {
87 stop := make(chan bool)
88 go func() {
89 for {
90 cleanupFunction()
91 select {
92 case <-time.After(delay):
93 case <-stop:
94 return
95 }
96 }
97 }()
98 return stop
99}
100
101// KcosSystemDiagnosticsApiService is a service that implents the logic for the KcosSystemDiagnosticsApiServicer
102// This service should implement the business logic for every endpoint for the KcosSystemDiagnosticsApi API.
103// Include any external packages or services that will be required by this service.
104
105type KcosSystemDiagnosticsApiService struct {
106 work_done chan bool
107 archiveMap *ArchiveMap
108 mocking bool
109}
110
111// NewKcosSystemDiagnosticsApiService creates a default api service
112func NewKcosSystemDiagnosticsApiService() KcosSystemDiagnosticsApiServicer {
113 _archiveMap := make(ArchiveMap)
114 done := ScheduleCleanupTimer(_archiveMap.DeleteOldArchives, 1*time.Hour)
115 _, ok := os.LookupEnv("MOCK_DIAGS")
116 var testEnv = false
117 if ok {
118 log.Printf("POST calls will be mocked to provide dummy data")
119 testEnv = true
120 }
121 return &KcosSystemDiagnosticsApiService{
122 work_done: done,
123 archiveMap: &_archiveMap,
124 mocking: testEnv,
125 }
126}
127
128// DownloadLogs - Returns the elasticdump result in a file.
129func (s *KcosSystemDiagnosticsApiService) DownloadLogs(filename string) (interface{}, error) {
130 lock.RLock()
131 defer lock.RUnlock()
132 for _, v := range *s.archiveMap {
133 if v.filename == filename {
134 return fmt.Sprintf("/tmp/%s", filename), nil
135 }
136 }
137 return nil, errors.New(fmt.Sprintf("Archive %s was not generated via export", filename))
138}
139
140// GetIndex - The index page
141func (s *KcosSystemDiagnosticsApiService) GetIndex() (interface{}, error) {
142 return "index", nil
143}
144
145// KcosSystemDiagnosticsComponents - Returns the components available for collection.
146func (s *KcosSystemDiagnosticsApiService) KcosSystemDiagnosticsComponents() (interface{}, error) {
147 return []KcosSystemDiagnosticsComponent{
148 KcosSystemDiagnosticsComponent{ComponentName: "elasticdump", Subcomponents: []string{}},
149 KcosSystemDiagnosticsComponent{ComponentName: "clusterinfo", Subcomponents: []string{}},
150 }, nil
151}
152
153func GenerateArchiveName() (string, int32) {
154 id := atomic.AddInt32(&counter, 1)
155 return fmt.Sprintf("chassis_logs-%s-%d.zip", time.Now().Format("2006-01-02-15-04-05"), id), id
156}
157
158func GenerateFilenameWithTimestamp(prefix string, suffix string) (string) {
159 return fmt.Sprintf("%s-%s.%s", prefix, time.Now().Format("2006-01-02-15-04-05"), suffix)
160}
161
162func DeleteFile(filename string) {
163 err := os.Remove(filename)
164 if err != nil {
165 log.Printf("DeleteFile: Failed to remove file %s: %s", filename, err)
166 }
167}
168
169// ZipFiles compresses one or many files into a single zip archive file.
170// Param 1: filename is the output zip file's name.
171// Param 2: files is a list of files to add to the zip.
172func ZipFiles(filename string, files []string) error {
173 newZipFile, err := os.Create(filename)
174 if err != nil {
175 return err
176 }
177 defer newZipFile.Close()
178 zipWriter := zip.NewWriter(newZipFile)
179 defer zipWriter.Close()
180 // Add files to zip
181 for _, file := range files {
182 if err = AddFileToZip(zipWriter, file); err != nil {
183 return err
184 }
185 }
186 return nil
187}
188
189func AddFileToZip(zipWriter *zip.Writer, filename string) error {
190 fileToZip, err := os.Open(filename)
191 if err != nil {
192 return err
193 }
194 defer fileToZip.Close()
195 // Get the file information
196 info, err := fileToZip.Stat()
197 if err != nil {
198 return err
199 }
200 header, err := zip.FileInfoHeader(info)
201 if err != nil {
202 return err
203 }
204 // Using FileInfoHeader() above only uses the basename of the file. If we want
205 // to preserve the folder structure we can overwrite this with the full path.
206 header.Name = filename
207 // Change to deflate to gain better compression
208 // see http://golang.org/pkg/archive/zip/#pkg-constants
209 header.Method = zip.Deflate
210 writer, err := zipWriter.CreateHeader(header)
211 if err != nil {
212 return err
213 }
214 _, err = io.Copy(writer, fileToZip)
215 return err
216}
217
218func RunElasticdump(filename string, files *[]string) error {
219 log.Printf("RunElasticdump: Starting generating the ES-dump file: %s", filename)
220
221 cmd := exec.Command("elasticdump",
222 "--input=http://elasticsearch-master:9200/_all",
223 "--output=/tmp/"+filename,
224 "--type=data",
225 "--fsCompress",
226 "--concurrency=16", // TODO: get number of cores from pod
227 "--concurrencyInterval=1000",
228 "--limit=8192")
229
230 err := cmd.Run()
231 if err != nil {
232 log.Printf("RunElasticdump: cmd.Run() failed to generate the ES-dump file %s:%s\n", filename, err)
233 return err
234 }
235
236 *files = append(*files, filename)
237 log.Printf("RunElasticdump: Finished generating the ES-dump file: %s", filename)
238 return nil
239}
240
241func RunClusterInfoDump(filename string, files *[]string) error {
242 log.Printf("RunClusterInfoDump: Starting generating the cluster info dump file: %s", filename)
243
244 file, err := os.Create(filename)
245 if err != nil {
246 log.Printf("RunClusterInfoDump: os.Create() failed for file %s:%s\n", filename, err)
247 return err
248 }
249 defer file.Close()
250
251 cmd := exec.Command("kubectl", "cluster-info", "dump")
252 cmd.Stdout = file
253 err = cmd.Run()
254 if err != nil {
255 log.Printf("RunClusterInfoDump: cmd.Run() failed to generate cluster info dump file %s:%s\n", filename, err)
256 return err
257 }
258
259 *files = append(*files, filename)
260 log.Printf("RunClusterInfoDump: Finished generating the cluster-info dump: %s\n", filename)
261 return nil
262
263}
264
265func RunLogCollection(archiveName string, id int32, archiveMap *ArchiveMap, mocking bool) {
266 log.Printf("RunLogCollection: Started generating the kcos system diagnostics: %s", archiveName)
267
268 if mocking {
269 archiveName = "mock-diags.txt"
270 }
271 archiveMap.AddArchive(id, archiveName, log_collection_running)
272
273 if !mocking {
274 var files []string // Contains the files that will be addded to the archive
275
276 var es_data = GenerateFilenameWithTimestamp("/tmp/elasticsearchdata", ".gz")
277 err1 := RunElasticdump(es_data, &files)
278 defer DeleteFile(es_data)
279
280 var cluster_data = GenerateFilenameWithTimestamp("/tmp/cluster-info", ".txt")
281 err2 := RunClusterInfoDump(cluster_data, &files)
282 defer DeleteFile(cluster_data)
283
284 if err1 != nil && err2 != nil {
285 archiveMap.UpdateArchiveState(id, log_collection_failure)
286 return
287 }
288
289 if err := ZipFiles("/tmp/"+archiveName, files); err != nil {
290 log.Printf("RunLogCollection: Error archiving files %v: %s\n", files, err)
291 return
292 }
293 fmt.Printf("RunLogCollection: Zipped files %v into %s", files, archiveName)
294 } else {
295 f, err := os.Create("/tmp/" + archiveName)
296 if err != nil {
297 log.Printf("RunLogCollection: failed to create the file /tmp/%s: %s", archiveName, err)
298 archiveMap.UpdateArchiveState(id, log_collection_failure)
299 return
300 }
301 w := bufio.NewWriter(f)
302 w.WriteString("Test diagnostics file")
303 w.Flush()
304 time.Sleep(15 * time.Second)
305 }
306
307 archiveMap.UpdateArchiveState(id, log_collection_success)
308 log.Printf("RunLogCollection: Finished generating the kcos system diagnostics: %s", archiveName)
309}
310
311// KcosSystemDiagnosticsExport - Collects the logs for one or more components.
312func (s *KcosSystemDiagnosticsApiService) KcosSystemDiagnosticsExport(kcosSystemDiagnosticsComponent []KcosSystemDiagnosticsComponent) (interface{}, error) {
313 archiveName, id := GenerateArchiveName()
314 go RunLogCollection(archiveName, id, s.archiveMap, s.mocking)
315 return &KcosExportResponse{
316 Id: id,
317 Type: "/api/v2/diagnostics/operations/export",
318 State: "In progress",
319 Progress: 0,
320 Message: "Operation started succesfully",
321 Url: fmt.Sprintf("/api/v2/diagnostics/operations/export/%d", id),
322 }, nil
323}
324
325// GetExportStatus - Returns the state of an ongoing export operation.
326func (s *KcosSystemDiagnosticsApiService) GetExportStatus(id int32) (interface{}, error) {
327 lock.RLock()
328 defer lock.RUnlock()
329 v, ok := (*s.archiveMap)[id]
330 if !ok {
331 log.Printf("Requested status for operation %d which was not started", id)
332 return nil, errors.New("Requested status for operation which was not started")
333 }
334
335 switch v.state {
336 case log_collection_running:
337 return &KcosExportResponse{
338 Id: id,
339 Type: "/api/v2/diagnostics/operations/export",
340 State: "In progress",
341 Progress: 0,
342 Message: "Operation in progress",
343 Url: fmt.Sprintf("/api/v2/diagnostics/operations/export/%d", id),
344 }, nil
345 case log_collection_failure:
346 return &KcosExportResponse{
347 Id: id,
348 Type: "/api/v2/diagnostics/operations/export",
349 State: "Error",
350 Progress: 0,
351 Message: "Operation finished unsuccesfully",
352 Url: fmt.Sprintf("/api/v2/diagnostics/operations/export/%d", id),
353 }, nil
354 case log_collection_success:
355 return &KcosExportResponse{
356 Id: id,
357 Type: "/api/v2/diagnostics/operations/export",
358 State: "Success",
359 Progress: 100,
360 Message: "Operation finished succesfully",
361 Url: fmt.Sprintf("/api/v2/diagnostics/operations/export/%d", id),
362 ResultUrl: fmt.Sprintf("/api/v2/diagnostics/operations/export/result/%s", v.filename),
363 }, nil
364 default:
365 log.Printf("Invalid internal state %d", v.state)
366 return nil, errors.New(fmt.Sprintf("Invalid internal state %d", v.state))
367 }
368}