· 5 years ago · Nov 21, 2020, 10:56 PM
1# Copyright (C) 2010-2015 Cuckoo Foundation.
2# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
3# See the file 'docs/LICENSE' for copying permission.
4
5from __future__ import absolute_import
6import os
7import gc
8import logging
9from lib.cuckoo.common.abstracts import Report
10from lib.cuckoo.common.exceptions import CuckooDependencyError
11from lib.cuckoo.common.exceptions import CuckooReportError
12from lib.cuckoo.common.objects import File
13from six.moves import zip
14
15MONGOSIZELIMIT = 0x1000000
16MEGABYTE = 0x100000
17
18try:
19 from pymongo import MongoClient
20 from bson.objectid import ObjectId
21 from pymongo.errors import ConnectionFailure, InvalidDocument
22
23 HAVE_MONGO = True
24except ImportError:
25 HAVE_MONGO = False
26
27log = logging.getLogger(__name__)
28
29
30class MongoDB(Report):
31 """Stores report in MongoDB."""
32
33 order = 9999
34
35 # Mongo schema version, used for data migration.
36 SCHEMA_VERSION = "1"
37
38 def connect(self):
39 """Connects to Mongo database, loads options and set connectors.
40 @raise CuckooReportError: if unable to connect.
41 """
42 host = self.options.get("host", "127.0.0.1")
43 port = self.options.get("port", 27017)
44 db = self.options.get("db", "cuckoo")
45
46 try:
47 self.conn = MongoClient(
48 host, port=port, username=self.options.get("username", None), password=self.options.get("password", None), authSource=db
49 )
50 self.db = self.conn[db]
51 except TypeError:
52 raise CuckooReportError("Mongo connection port must be integer")
53 except ConnectionFailure:
54 raise CuckooReportError("Cannot connect to MongoDB")
55
56 def debug_dict_size(self, dct):
57 if type(dct) == list:
58 dct = dct[0]
59
60 totals = dict((k, 0) for k in dct)
61
62 def walk(root, key, val):
63 if isinstance(val, dict):
64 for k, v in val.items():
65 walk(root, k, v)
66
67 elif isinstance(val, (list, tuple, set)):
68 for el in val:
69 walk(root, None, el)
70
71 elif isinstance(val, str):
72 totals[root] += len(val)
73
74 for key, val in dct.items():
75 walk(key, key, val)
76
77 return sorted(list(totals.items()), key=lambda item: item[1], reverse=True)
78
79 @classmethod
80 def ensure_valid_utf8(cls, obj):
81 """Ensures that all strings are valid UTF-8 encoded, which is
82 required by MongoDB to be able to store the JSON documents.
83 @param obj: analysis results dictionary.
84 """
85 if not obj:
86 return
87
88 items = []
89 if isinstance(obj, dict):
90 items = obj.items()
91 elif isinstance(obj, list):
92 items = enumerate(obj)
93
94 for k, v in items:
95 # This type check is intentionally not done using isinstance(),
96 # because bson.binary.Binary *is* a subclass of bytes/str, and
97 # we do not want to convert that.
98 if type(v) is str:
99 try:
100 v.encode("utf-8")
101 except UnicodeEncodeError:
102 obj[k] = "".join(str(ord(_)) for _ in v).encode("utf-8")
103 else:
104 cls.ensure_valid_utf8(v)
105
106 def run(self, results):
107 """Writes report.
108 @param results: analysis results dictionary.
109 @raise CuckooReportError: if fails to connect or write to MongoDB.
110 """
111 # We put the raise here and not at the import because it would
112 # otherwise trigger even if the module is not enabled in the config.
113 if not HAVE_MONGO:
114 raise CuckooDependencyError("Unable to import pymongo " "(install with `pip3 install pymongo`)")
115
116 self.connect()
117
118 # Set mongo schema version.
119 # TODO: This is not optimal because it run each analysis. Need to run only one time at startup.
120 if "cuckoo_schema" in self.db.collection_names():
121 if self.db.cuckoo_schema.find_one()["version"] != self.SCHEMA_VERSION:
122 CuckooReportError("Mongo schema version not expected, check data migration tool")
123 else:
124 self.db.cuckoo_schema.save({"version": self.SCHEMA_VERSION})
125
126 # Create a copy of the dictionary. This is done in order to not modify
127 # the original dictionary and possibly compromise the following
128 # reporting modules.
129 report = dict(results)
130
131 if "network" not in report:
132 report["network"] = {}
133
134 # Add screenshot paths
135 report["shots"] = []
136 shots_path = os.path.join(self.analysis_path, "shots")
137 if os.path.exists(shots_path):
138 shots = [shot for shot in os.listdir(shots_path) if shot.endswith(".jpg")]
139 for shot_file in sorted(shots):
140 shot_path = os.path.join(self.analysis_path, "shots", shot_file)
141 screenshot = File(shot_path)
142 if screenshot.valid():
143 # Strip the extension as it's added later
144 # in the Django view
145 report["shots"].append(shot_file.replace(".jpg", ""))
146
147 # Store chunks of API calls in a different collection and reference
148 # those chunks back in the report. In this way we should defeat the
149 # issue with the oversized reports exceeding MongoDB's boundaries.
150 # Also allows paging of the reports.
151 new_processes = []
152
153 for process in report.get("behavior", {}).get("processes", []) or []:
154 new_process = dict(process)
155 chunk = []
156 chunks_ids = []
157 # Loop on each process call.
158 for _, call in enumerate(process["calls"]):
159 # If the chunk size is 100 or if the loop is completed then
160 # store the chunk in MongoDB.
161 if len(chunk) == 100:
162 to_insert = {"pid": process["process_id"], "calls": chunk}
163 chunk_id = self.db.calls.insert(to_insert)
164 chunks_ids.append(chunk_id)
165 # Reset the chunk.
166 chunk = []
167 # Append call to the chunk.
168 chunk.append(call)
169 # Store leftovers.
170 if chunk:
171 to_insert = {"pid": process["process_id"], "calls": chunk}
172 chunk_id = self.db.calls.insert(to_insert)
173 chunks_ids.append(chunk_id)
174 # Add list of chunks.
175 new_process["calls"] = chunks_ids
176 new_processes.append(new_process)
177 # Store the results in the report.
178 report["behavior"] = dict(report["behavior"])
179 report["behavior"]["processes"] = new_processes
180 # Calculate the mlist_cnt for display if present to reduce db load
181 if "signatures" in results:
182 for entry in results["signatures"]:
183 if entry["name"] == "ie_martian_children":
184 report["mlist_cnt"] = len(entry["data"])
185 if entry["name"] == "office_martian_children":
186 report["f_mlist_cnt"] = len(entry["data"])
187
188 # Other info we want quick access to from the web UI
189 if results.get("virustotal", False) and "positives" in results["virustotal"] and "total" in results["virustotal"]:
190 report["virustotal_summary"] = "%s/%s" % (results["virustotal"]["positives"], results["virustotal"]["total"])
191 if results.get("suricata", False):
192
193 keywords = ("tls", "alerts", "files", "http", "ssh", "dns")
194 keywords_dict = ("suri_tls_cnt", "suri_alert_cnt", "suri_file_cnt", "suri_http_cnt", "suri_ssh_cnt", "suri_dns_cnt")
195 for keyword, keyword_value in zip(keywords, keywords_dict):
196 if results["suricata"].get(keyword, 0):
197 report[keyword_value] = len(results["suricata"][keyword])
198
199 # Create an index based on the info.id dict key. Increases overall scalability
200 # with large amounts of data.
201 # Note: Silently ignores the creation if the index already exists.
202 self.db.analysis.create_index("info.id", background=True)
203
204 # trick for distributed api
205 if results.get("info", {}).get("options", {}).get("main_task_id", ""):
206 report["info"]["id"] = int(results["info"]["options"]["main_task_id"])
207
208 analyses = self.db.analysis.find({"info.id": int(report["info"]["id"])})
209 if analyses.count() > 0:
210 log.debug("Deleting analysis data for Task %s" % report["info"]["id"])
211 for analysis in analyses:
212 for process in analysis["behavior"]["processes"]:
213 for call in process["calls"]:
214 self.db.calls.remove({"_id": ObjectId(call)})
215 self.db.analysis.remove({"_id": ObjectId(analysis["_id"])})
216 log.debug("Deleted previous MongoDB data for Task %s" % report["info"]["id"])
217
218 self.ensure_valid_utf8(report)
219 gc.collect()
220
221 # Store the report and retrieve its object id.
222 try:
223 self.db.analysis.save(report, check_keys=False)
224 except InvalidDocument as e:
225 parent_key, psize = self.debug_dict_size(report)[0]
226 if not self.options.get("fix_large_docs", False):
227 # Just log the error and problem keys
228 log.error(str(e))
229 log.error("Largest parent key: %s (%d MB)" % (parent_key, int(psize) / MEGABYTE))
230 else:
231 # Delete the problem keys and check for more
232 error_saved = True
233 size_filter = MONGOSIZELIMIT
234 while error_saved:
235 if type(report) == list:
236 report = report[0]
237 try:
238 if type(report[parent_key]) == list:
239 for j, parent_dict in enumerate(report[parent_key]):
240 child_key, csize = self.debug_dict_size(parent_dict)[0]
241 if csize > size_filter:
242 log.warn("results['%s']['%s'] deleted due to size: %s" % (parent_key, child_key, csize))
243 del report[parent_key][j][child_key]
244 else:
245 child_key, csize = self.debug_dict_size(report[parent_key])[0]
246 if csize > size_filter:
247 log.warn("results['%s']['%s'] deleted due to size: %s" % (parent_key, child_key, csize))
248 del report[parent_key][child_key]
249 try:
250 self.db.analysis.save(report, check_keys=False)
251 error_saved = False
252 except InvalidDocument as e:
253 parent_key, psize = self.debug_dict_size(report)[0]
254 log.error(str(e))
255 log.error("Largest parent key: %s (%d MB)" % (parent_key, int(psize) / MEGABYTE))
256 size_filter = size_filter - MEGABYTE
257 except Exception as e:
258 log.error("Failed to delete child key: %s" % str(e))
259 error_saved = False
260
261 self.conn.close()