(g)ULP!
Loading...
Searching...
No Matches
plugin.py
Go to the documentation of this file.
1"""Gulp plugin base class and plugin utilities.
2"""
3
4import ipaddress
5import os
6from abc import ABC, abstractmethod
7from types import ModuleType
8from typing import Any, Callable
9
10import muty.crypto
11import muty.dynload
12import muty.file
13import muty.jsend
14import muty.string
15import muty.time
16from sigma.processing.pipeline import ProcessingPipeline
17
18from gulp import config
19from gulp import utils as gulp_utils
20from gulp.api import collab_api, elastic_api
21from gulp.api.collab.base import GulpRequestStatus
22from gulp.api.collab.stats import GulpStats, TmpIngestStats
23from gulp.api.elastic.structs import (
24 GulpDocument,
25 GulpIngestionFilter,
26 GulpQueryFilter,
27 GulpQueryOptions,
28)
29from gulp.api.mapping import helpers as mapping_helpers
30from gulp.api.mapping.models import FieldMappingEntry, GulpMapping, GulpMappingOptions
31from gulp.api.rest import ws as ws_api
32from gulp.api.rest.ws import WsQueueDataType
33from gulp.defs import (
34 GulpEventFilterResult,
35 GulpLogLevel,
36 GulpPluginType,
37 ObjectNotFound,
38)
39from gulp.plugin_internal import GulpPluginOption, GulpPluginParams
40from gulp.utils import logger
41
42# caches plugin modules for the running process
43_cache: dict = {}
44
45
46class PluginBase(ABC):
47 """
48 Base class for all Gulp plugins.
49 """
50
52 self,
53 path: str,
54 **kwargs,
55 ) -> None:
56 """
57 Initializes a new instance of the PluginBase class.
58
59 Args:
60 path (str): The path to the plugin.
61 kwargs: additional arguments if any
62 """
63 self.req_id: str = None
64 self.index: str = None
65 self.client_id: str = None
66 self.operation_id: str = None
67 self.context: str = None
68 self.ws_id: str = None
69 self.path = path
70
71 self.buffer: list[GulpDocument] = []
72 for k, v in kwargs.items():
73 self.__dict__[k] = v
74
75 super().__init__()
76
77 def options(self) -> list[GulpPluginOption]:
78 """
79 return available GulpPluginOption list (plugin specific parameters)
80 """
81 return []
82
83 @abstractmethod
84 def type(self) -> GulpPluginType:
85 """
86 Returns the plugin type.
87 """
88
89 @abstractmethod
90 def version(self) -> str:
91 """
92 Returns plugin version.
93 """
94
95 @abstractmethod
96 def desc(self) -> str:
97 """
98 Returns a description of the plugin.
99 """
100
101 @abstractmethod
102 def name(self) -> str:
103 """
104 Returns the name of the plugin.
105 """
106
107 def event_type_field(self) -> str:
108 """
109 Returns the field name for the event type.
110 """
111 return "event.code"
112
113 def depends_on(self) -> list[str]:
114 """
115 Returns a list of plugins this plugin depends on.
116 """
117 return []
118
119 async def query(
120 self,
121 operation_id: int,
122 client_id: int,
123 user_id: int,
124 username: str,
125 ws_id: str,
126 req_id: str,
127 plugin_params: GulpPluginParams,
128 flt: GulpQueryFilter,
129 options: GulpQueryOptions = None,
130 ) -> tuple[int, GulpRequestStatus]:
131 """
132 used in query plugins to query data directly from external sources.
133
134 Args:
135 operation_id (int): operation ID
136 client_id (int): client ID
137 user_id (int): user ID performing the query
138 username (str): username performing the query
139 ws_id (str): websocket ID to stream the returned data to
140 req_id (str): request ID
141 plugin_params (GulpPluginParams, optional): plugin parameters, including i.e. in GulpPluginParams.extra the login/pwd/token to connect to the external source, plugin dependent.
142 flt (GulpQueryFilter): query filter (will be converted to the external source query format)
143 options (GulpQueryOptions, optional): query options, i.e. to limit the number of returned records. Defaults to None.
144 due to the nature of query plugins, not all options may be supported (i.e. limit, offset, ...) and notes creation is always disabled.
145 Returns:
146 tuple[int, GulpRequestStatus]: the number of records returned and the status of the query.
147 """
148 return 0, GulpRequestStatus.FAILED
149
150 def internal(self) -> bool:
151 """
152 Returns whether the plugin is for internal use only
153 """
154 return False
155
156 def tags(self) -> list[str]:
157 """
158 returns a list of tags for the plugin. Tags are used to aid filtering of plugins/query filters in the UI.
159 - "event"
160 - "network"
161 - "file"
162 - "process"
163 - "threat"
164 - "threat.enrichments"
165 - ...
166 """
167 return []
168
170 self, custom_mapping: GulpMapping, plugin_params: GulpPluginParams = None
171 ) -> tuple[GulpMapping, GulpPluginParams]:
172 """
173 Process the plugin parameters checking parameters as `timestamp_field`, ... and update the custom mapping accordingly.
174
175 Args:
176 custom_mapping (GulpMapping): The custom mapping provided: if it is not empty, it will be used as is (plugin_params will be ignored)
177 plugin_params (GulpPluginParams, optional): The plugin parameters. Defaults to None.
178
179 Returns:
180 tuple[GulpMapping, GulpPluginParams]: The updated custom mapping and plugin parameters.
181 """
182 if plugin_params is None:
183 plugin_params = GulpPluginParams()
184
185 if len(custom_mapping.fields) > 0:
186 # custom_mapping provided
187 return custom_mapping, plugin_params
188
189 logger().warning("no custom mapping provided")
190
191 tf = plugin_params.timestamp_field
192 if tf is not None:
193 logger().debug("using timestamp_field=%s" % (tf))
194 # build a proper custom_mapping with just timestamp
195 custom_mapping.fields[tf] = FieldMappingEntry(is_timestamp=True)
196 # logger().debug(custom_mapping)
197 return custom_mapping, plugin_params
198
200 self,
201 pipeline: ProcessingPipeline = None,
202 mapping_file: str = None,
203 mapping_id: str = None,
204 product: str = None,
205 plugin_params: GulpPluginParams = None,
206 ) -> ProcessingPipeline:
207 """
208 Initializes the Sigma plugin to convert sigma rules YAML to elasticsearch DSL query.
209
210 Args:
211 pipeline (ProcessingPipeline, optional): The processing pipeline. Defaults to None (empty pipeline, plugin must fill it).
212 mapping_file (str, optional): The name of the mapping file (i.e. 'windows.json') in the gulp/mapping_files directory. Defaults to None (use pipeline mapping only).
213 mapping_id (str, optional): The mapping ID (["options"]["mapping_id"] in the mapping file, to get a specific mapping). Defaults to None (use first).
214 product (str, optional): The product. Defaults to None.
215 plugin_params (GulpPluginParams, optional): The plugin parameters, to override i.e. mapping_file_path, mapping_id, ... Defaults to None.
216 Returns:
217 ProcessingPipeline: The initialized processing pipeline.
218 """
219 logger().debug("INITIALIZING SIGMA plugin=%s" % (self.name()))
220
221 mapping_file_path = None
222 if mapping_file is not None:
223 # get path in mapping directory
224 mapping_file_path = gulp_utils.build_mapping_file_path(mapping_file)
225
226 if plugin_params is not None:
227 # override with plugin_params
228 if plugin_params.mapping_file is not None:
229 mapping_file_path = gulp_utils.build_mapping_file_path(
230 plugin_params.mapping_file
231 )
232 if plugin_params.mapping_id is not None:
233 mapping_id = plugin_params.mapping_id
234
235 p = await mapping_helpers.get_enriched_pipeline(
236 pipeline=pipeline,
237 mapping_file_path=mapping_file_path,
238 mapping_id=mapping_id,
239 product=product,
240 )
241 return p
242
244 self,
245 index: str,
246 record: any,
247 record_idx: int,
248 my_record_to_gulp_document_fun: Callable,
249 ws_id: str,
250 req_id: str,
251 operation_id: int,
252 client_id: int,
253 context: str,
254 source: str,
255 fs: TmpIngestStats,
256 custom_mapping: GulpMapping = None,
257 index_type_mapping: dict = None,
258 plugin: str = None,
259 plugin_params: GulpPluginParams = None,
260 flt: GulpIngestionFilter = None,
261 **kwargs,
262 ) -> tuple[TmpIngestStats, bool]:
263 """
264 Process a record for ingestion, updating ingestion stats.
265 Args:
266 index (str): The index to ingest the record into.
267 record (any): The record to process.
268 record_idx (int): The index of the record as in the source.
269 my_record_to_gulp_document_fun (Callable): The function (for this plugin) to convert the record to one or more GulpDocument/s.
270 ws_id (str): The websocket ID
271 req_id (str): The request ID.
272 operation_id (int): The operation ID.
273 client_id (int): The client ID.
274 context (str): The context of the record.
275 source (str): The source of the record.
276 fs (TmpIngestStats): The temporary ingestion statistics.
277 custom_mapping (GulpMapping, optional): The custom mapping for the record. Defaults to None.
278 index_type_mapping (dict, optional): The elastic index type mapping. Defaults to None.
279 plugin (str, optional): The plugin name. Defaults to None.
280 plugin_params (GulpPluginParams, optional): The plugin parameters. Defaults to None.
281 flt (GulpIngestionFilter, optional): The ingestion filter. Defaults to None.
282 **kwargs: Additional keyword arguments.
283 Returns:
284 tuple[TmpIngestStats, bool]: A tuple containing the updated temporary ingestion statistics and a flag indicating whether to break the ingestion process.
285 """
286
287 # convert record to one or more GulpDocument objects
288 docs = await self._call_record_to_gulp_document_funcs(
289 operation_id=operation_id,
290 client_id=client_id,
291 context=context,
292 source=source,
293 fs=fs,
294 record=record,
295 record_idx=record_idx,
296 custom_mapping=custom_mapping,
297 index_type_mapping=index_type_mapping,
298 plugin=plugin,
299 plugin_params=plugin_params,
300 record_to_gulp_document_fun=my_record_to_gulp_document_fun,
301 **kwargs,
302 )
303 # ingest record
304 for d in docs:
305 fs = await self._ingest_record(index, d, fs, ws_id, req_id, flt, **kwargs)
306
307 status, _ = await GulpStats.update(
308 await collab_api.collab(),
309 req_id,
310 ws_id,
311 fs=fs.update(processed=len(docs)),
312 )
313 must_break = False
314 if status in [GulpRequestStatus.FAILED, GulpRequestStatus.CANCELED]:
315 must_break = True
316
317 return fs, must_break
318
320 self,
321 index: str,
322 source: str | dict,
323 skip_mapping: bool = False,
324 pipeline: ProcessingPipeline = None,
325 mapping_file: str = None,
326 mapping_id: str = None,
327 plugin_params: GulpPluginParams = None,
328 ) -> tuple[dict, GulpMapping]:
329 """
330 Initializes the ingestion plugin.
331
332 Args:
333 index (str): The name of the elasticsearch index.
334 source (str|dict): The source of the record (source file name or path, usually. may also be a dictionary.).
335 skip_mapping (bool, optional): Whether to skip mapping initialization (just prints source and plugin name, and return empty index mapping and custom mapping). Defaults to False.
336 pipeline (ProcessingPipeline, optional): The psyigma pipeline to borrow the mapping from, if any. Defaults to None (use mapping file only).
337 mapping_file (str, optional): name of the mapping file (i.e. 'windows.json') in the gulp/mapping_files directory. Defaults to None (use pipeline mapping only).
338 mapping_id (str, optional): The mapping ID (options.mapping_id) in the mapping file, to get a specific mapping. Defaults to None (use first).
339 plugin_params (GulpPluginParams, optional): The plugin parameters (i.e. to override mapping_file, mapping_id, ...). Defaults to None.
340 Returns:
341 tuple[dict, GulpMapping]: A tuple containing the elasticsearch index type mappings and the enriched GulpMapping (or an empty GulpMapping is no valid pipeline and/or mapping_file are provided).
342
343 """
344 # logger().debug("ingest_plugin_initialize: index=%s, pipeline=%s, mapping_file=%s, mapping_id=%s, plugin_params=%s" % (index, pipeline, mapping_file, mapping_id, plugin_params))
345 logger().debug(
346 "INITIALIZING INGESTION for source=%s, plugin=%s"
347 % (muty.string.make_shorter(source, 260), self.name())
348 )
349 if skip_mapping:
350 return {}, GulpMapping()
351
352 # get path of the mapping file in gulp/mapping_files folder
353 mapping_file_path = None
354 if mapping_file is not None:
355 mapping_file_path = gulp_utils.build_mapping_file_path(mapping_file)
356
357 if plugin_params is not None:
358 # override with plugin_params
359 if plugin_params.mapping_file is not None:
360 mapping_file_path = gulp_utils.build_mapping_file_path(
361 plugin_params.mapping_file
362 )
363 if plugin_params.mapping_id is not None:
364 mapping_id = plugin_params.mapping_id
365 if plugin_params.pipeline is not None:
366 pipeline = plugin_params.pipeline
367
368 index_type_mappings = await elastic_api.index_get_mapping(
369 elastic_api.elastic(), index, False
370 )
371 # index_type_mappings = await elastic_api.datastream_get_mapping(self.elastic, index + '-template')
372 m: GulpMapping = await mapping_helpers.get_enriched_mapping_for_ingestion(
373 pipeline=pipeline,
374 mapping_file_path=mapping_file_path,
375 mapping_id=mapping_id,
376 )
377 return index_type_mappings, m
378
380 self,
381 operation_id: int,
382 client_id: int,
383 context: str,
384 source: str,
385 fs: TmpIngestStats,
386 record: any,
387 record_idx: int,
388 custom_mapping: GulpMapping = None,
389 index_type_mapping: dict = None,
390 plugin: str = None,
391 plugin_params: GulpPluginParams = None,
392 record_to_gulp_document_fun: Callable = None,
393 **kwargs,
394 ) -> list[GulpDocument]:
395 """Stub function to call stacked plugins record_to_document_gulp_document.
396 Each function is called with the previously returned GulpDocument.
397
398 Args:
399 operation_id (int): the operation ID associated with the record
400 client_id (int): client ID performing the ingestion
401 context (str): context associated with the record
402 source (str): source of the record (source file name or path, usually)
403 fs (TmpIngestStats): _description_
404 record (any): a single record (first time) or a list of GulpDocument objects (in stacked plugins)
405 record_idx (int): The index of the record in source.
406 custom_mapping (GulpMapping, optional): The custom mapping to use for the conversion. Defaults to None.
407 index_type_mapping (dict, optional): elastic search index type mappings { "field": "type", ... }. Defaults to None.
408 plugin (str, optional): "agent.type" to be set in the GulpDocument. Defaults to None.
409 plugin_params (GulpPluginParams, optional): The plugin parameters to use, if any. Defaults to None.
410 record_to_gulp_document_fun (Callable, optional): function to parse record into a gulp document, if stacked this receives a list of GulpDocuments
411
412 Returns:
413 list[GulpDocument]: zero or more GulpDocument objects
414 """
415 # plugin_params=deepcopy(plugin_params)
416
417 if plugin_params is None:
418 plugin_params = GulpPluginParams()
419
420 docs = record
421
422 if record_to_gulp_document_fun is not None:
423 docs = await record_to_gulp_document_fun(
424 operation_id,
425 client_id,
426 context,
427 source,
428 fs,
429 record,
430 record_idx,
431 custom_mapping,
432 index_type_mapping,
433 plugin,
434 plugin_params,
435 **kwargs,
436 )
437
438 for fun in plugin_params.record_to_gulp_document_fun:
439 docs = await fun(
440 operation_id,
441 client_id,
442 context,
443 source,
444 fs,
445 docs,
446 record_idx,
447 custom_mapping,
448 index_type_mapping,
449 plugin,
450 plugin_params,
451 **kwargs,
452 )
453
454 if docs is None:
455 return []
456 return docs
457
459 self,
460 operation_id: int,
461 client_id: int,
462 context: str,
463 source: str,
464 fs: TmpIngestStats,
465 record: any,
466 record_idx: int,
467 custom_mapping: GulpMapping = None,
468 index_type_mapping: dict = None,
469 plugin: str = None,
470 plugin_params: GulpPluginParams = None,
471 **kwargs,
472 ) -> list[GulpDocument]:
473 """
474 Converts a record to one or more GulpDocument objects based on the provided index mappings.
475
476 Args:
477 operation_id (int): The operation ID associated with the record.
478 client_id (int): The client ID associated with the record.
479 context (str): The context associated with the record.
480 source (str): The source of the record (source file name or path, usually).
481 fs (TmpIngestStats): The temporary ingestion statistics (may be updated on return).
482 record (any): record to convert, plugin dependent format: note that here stacked plugins receives a list of GulpDocument objects instead (since the original record may generate one or more documents).
483 record_idx (int): The index of the record in source.
484 custom_mapping (GulpMapping, optional): The custom mapping to use for the conversion. Defaults to None.
485 index_type_mapping (dict, optional): elastic search index type mappings { "ecs_field": "type", ... }. Defaults to None.
486 plugin (str, optional): "agent.type" to be set in the GulpDocument. Defaults to None.
487 plugin_params (GulpPluginParams, optional): The plugin parameters to use, if any. Defaults to None.
488 extra (dict, optional): Additional fields to add to the GulpDocument (after applying mapping). Defaults to None.
489 **kwargs: Additional keyword arguments:
490
491 Returns:
492 list[GulDocument]: The converted GulpDocument objects or None if an exception occurred (fs is updated then).
493
494 Raises:
495 NotImplementedError: This method is not implemented yet.
496 """
497 raise NotImplementedError("not implemented!")
498
499 async def ingest(
500 self,
501 index: str,
502 req_id: str,
503 client_id: int,
504 operation_id: int,
505 context: str,
506 source: str | list[dict],
507 ws_id: str,
508 plugin_params: GulpPluginParams = None,
509 flt: GulpIngestionFilter = None,
510 **kwargs,
511 ) -> GulpRequestStatus:
512 """
513 Ingests a file using the plugin.
514
515 NOTE: implementers should call super().ingest() in their implementation.
516 NOTE: this function *SHOULD NOT* raise exceptions
517
518 Args:
519 index (str): name of the elasticsearch index to ingest the document into.
520 req_id (str): The request ID related to this ingestion (must exist on the collab db).
521 client_id (int): The client ID performing the ingestion.
522 operation_id (int): The operation ID related to this ingestion.
523 context (str): Context related to this ingestion.
524 source (str|list[dict]): The path to the file to ingest, or a list of events dicts.
525 ws_id (str): The websocket ID
526 plugin_params (GulpPluginParams): additional parameters to pass to the ingestion function. Defaults to None.
527 flt (GulpIngestionFilter, optional): filter to apply to this ingestion, if any. Defaults to None.
528 kwargs: additional arguments if any
529 """
530 self.req_id = req_id
531 self.client_id = client_id
532 self.operation_id = operation_id
533 self.context = context
534 self.ws_id = ws_id
535 self.index = index
536 # raise NotImplementedError("not implemented!")
537
538 async def pipeline(
539 self, plugin_params: GulpPluginParams = None, **kwargs
540 ) -> ProcessingPipeline:
541 """
542 Returns the pysigma processing pipeline for the plugin, if any.
543
544 Args:
545 plugin_params (GulpPluginParams, optional): additional parameters to pass to the pipeline. Defaults to None.
546 kwargs: additional arguments if any.
547 Returns:
548 ProcessingPipeline: The processing pipeline.
549 """
550 raise NotImplementedError("not implemented!")
551
552 def cleanup(self) -> None:
553 """
554 Optional cleanup routine to call on unload.
555 """
556 return
557
559 self,
560 fme: list[FieldMappingEntry],
561 idx: int,
562 operation_id: int,
563 context: str,
564 plugin: str,
565 client_id: int,
566 raw_event: str,
567 original_id: str,
568 src_file: str,
569 timestamp: int = None,
570 timestamp_nsec: int = None,
571 event_code: str = None,
572 cat: list[str] = None,
573 duration_nsec: int = 0,
574 gulp_log_level: GulpLogLevel = None,
575 original_log_level: str = None,
576 remove_raw_event: bool = False,
577 **kwargs,
578 ) -> list[GulpDocument]:
579 """
580 build one or more GulpDocument objects from a list of FieldMappingEntry objects:
581
582 this function creates as many GulpDocument objects as there are FieldMappingEntry objects with is_timestamp=True.
583 if no FieldMappingEntry object has is_timestamp=True, it creates a single GulpDocument object with the first FieldMappingEntry object.
584 """
585 docs: list[GulpDocument] = []
586 append_doc = docs.append # local variable for faster access
587
588 common_params = {
589 "idx": idx,
590 "operation_id": operation_id,
591 "context": context,
592 "plugin": plugin,
593 "client_id": client_id,
594 "raw_event": raw_event,
595 "original_id": original_id,
596 "src_file": src_file,
597 "timestamp": timestamp,
598 "timestamp_nsec": timestamp_nsec,
599 "event_code": event_code,
600 "cat": cat,
601 "duration_nsec": duration_nsec,
602 "gulp_log_level": gulp_log_level,
603 "original_log_level": original_log_level,
604 **kwargs,
605 }
606 for f in fme:
607 # print("%s\n\n" % (f))
608 # for each is_timestamp build a gulpdocument with all the fields in fme
609 if f.is_timestamp:
610 d = GulpDocument(fme=fme, f=f, **common_params)
611 if remove_raw_event:
612 d.original_event = None
613
614 # print("%s\n\n" % (d))
615 append_doc(d)
616
617 if len(docs) == 0:
618 # create a document with the given timestamp in timestamp/timestamp_nsec (if any, either it will be set to 0/invalid)
619 d = GulpDocument(fme=fme, **common_params)
620 if remove_raw_event:
621 d.original_event = None
622 append_doc(d)
623
624 return docs
625
626 def get_unmapped_field_name(self, field: str) -> str:
627 """
628 Returns the name of the unmapped field.
629
630 Parameters:
631 - field (str): The name of the field.
632
633 Returns:
634 - str: The name of the unmapped field.
635 """
636 if not elastic_api.UNMAPPED_PREFIX:
637 return field
638
639 return f"{elastic_api.UNMAPPED_PREFIX}.{field}"
640
641 def _type_checks(self, v: any, k: str, index_type_mapping: dict) -> any:
642 """
643 check if the value should be fixed based on the index type mapping
644
645 Args:
646 v (any): The value to check.
647 k (str): The mapped field (i.e. "user.id", may also be an unmapped (i.e. "gulp.unmapped") field)
648 index_type_mapping (dict): The elasticsearch index key->type mappings.
649 """
650 if k not in index_type_mapping:
651 # logger().debug("key %s not found in index_type_mapping" % (k))
652 return str(v)
653
654 index_type = index_type_mapping[k]
655 if index_type == "long":
656 # logger().debug("converting %s:%s to long" % (k, v))
657 if isinstance(v, str):
658 if v.isnumeric():
659 return int(v)
660 if v.startswith("0x"):
661 return int(v, 16)
662 return v
663
664 if index_type == "date" and isinstance(v, str) and v.lower().startswith("0x"):
665 # convert hex to int
666 return int(v, 16)
667
668 if index_type == "keyword" or index_type == "text":
669 # logger().debug("converting %s:%s to keyword" % (k, v))
670 return str(v)
671
672 if index_type == "ip":
673 # logger().debug("converting %s:%s to ip" % (k, v))
674 if "local" in v.lower():
675 return "127.0.0.1"
676 try:
677 ipaddress.ip_address(v)
678 except ValueError as ex:
679 logger().exception(ex)
680 return None
681
682 # add more types here if needed ...
683 # logger().debug("returning %s:%s" % (k, v))
684 return str(v)
685
687 self,
688 plugin_params: GulpPluginParams,
689 custom_mapping: GulpMapping,
690 source_key: str,
691 v: Any,
692 index_type_mapping: dict = None,
693 ignore_custom_mapping: bool = False,
694 **kwargs,
695 ) -> list[FieldMappingEntry]:
696 """
697 map source key to a field mapping entry with "result": {mapped_key: v}
698
699 Args:
700 plugin_params (GulpPluginParams): The plugin parameters.
701 custom_mapping (GulpMapping): The custom mapping.
702 source_key (str): The key to look for(=the event record key to be mapped) in the custom_mapping dictionary
703 v (any): value to set for mapped key/s.
704 index_type_mapping (dict, optional): The elasticsearch index key->type mappings. Defaults to None.
705 ignore_custom_mapping (bool, optional): Whether to ignore custom_mapping and directly map source_key to v. Defaults to False.
706 kwargs: Additional keyword arguments.
707
708 Returns:
709 list[FieldMappingEntry]: zero or more FieldMappingEntry objects with "result" set.
710 """
711 # get mapping and option from custom_mapping
712 if index_type_mapping is None:
713 index_type_mapping = {}
714 # logger().debug('len index type mapping=%d' % (len(index_type_mapping)))
715 mapping_dict: dict = custom_mapping.fields
716 mapping_options = (
717 custom_mapping.options
718 if custom_mapping.options is not None
719 else GulpMappingOptions()
720 )
721
722 # basic checks
723 if v == "-" or v is None:
724 return []
725
726 if isinstance(v, str):
727 v = v.strip()
728 if not v and mapping_options.ignore_blanks:
729 # not adding blank strings
730 return []
731
732 # fix value if needed, and add to extra
733 if ignore_custom_mapping or (
734 plugin_params is not None and plugin_params.ignore_mapping_ingest
735 ):
736 # direct mapping, no need to check custom_mappings
737 return [FieldMappingEntry(result={source_key: v})]
738
739 if source_key not in mapping_dict:
740 # logger().error('key "%s" not found in custom mapping, mapping_dict=%s!' % (source_key, muty.string.make_shorter(str(mapping_dict))))
741 # key not found in custom_mapping, check if we have to map it anyway
742 if not mapping_options.ignore_unmapped:
743 return [
744 FieldMappingEntry(
745 result={self.get_unmapped_field_name(source_key): str(v)}
746 )
747 ]
748
749 # there is a mapping defined to be processed
750 fm: FieldMappingEntry = mapping_dict[source_key]
751 map_to_list = (
752 [fm.map_to] if isinstance(fm.map_to, (str, type(None))) else fm.map_to
753 )
754
755 # in the end, this function will return a list of FieldMappingEntry objects with "result" set: these results will be used to create the GulpDocument object
756 fme_list: list[FieldMappingEntry] = []
757 for k in map_to_list:
758 # make a copy of fme without using deepcopy)
759 dest_fm = FieldMappingEntry(
760 is_timestamp=fm.is_timestamp,
761 event_code=fm.event_code,
762 do_multiply=fm.do_multiply,
763 is_timestamp_chrome=fm.is_timestamp_chrome,
764 is_variable_mapping=fm.is_variable_mapping,
765 result={},
766 )
767
768 # check if it is a number and/or a timestamp (including chrome timestamp, which is a special case)
769 is_numeric = isinstance(v, int) or str(v).isnumeric()
770 if is_numeric:
771 v = int(v)
772 # ensure chrome timestamp is properly converted to nanos
773 # logger().debug('***** is_numeric, v=%d' % (v))
774 if fm.is_timestamp_chrome:
775 v = int(muty.time.chrome_epoch_to_nanos(v))
776 # logger().debug('***** is_timestamp_chrome, v nsec=%d' % (v))
777
778 if fm.do_multiply is not None:
779 # apply a multipler if any (must turn v to nanoseconds)
780 # logger().debug("***** is_numeric, multiply, v=%d" % (v))
781 v = int(v * fm.do_multiply)
782 # logger().debug("***** is_numeric, AFTER multiply, v=%d" % (v))
783
784 elif isinstance(v, str) and fm.is_timestamp:
785 v = int(
786 muty.time.string_to_epoch_nsec(
787 v,
788 utc=mapping_options.timestamp_utc,
789 dayfirst=mapping_options.timestamp_dayfirst,
790 yearfirst=mapping_options.timestamp_yearfirst,
791 )
792 )
793 # logger().debug('***** str and is_timestamp, v nsec=%d' % (v))
794 if fm.is_timestamp:
795 # it's a timestamp, another event will be generated
796 vv = muty.time.nanos_to_millis(v)
797 dest_fm.result["@timestamp"] = vv
798 dest_fm.result["@timestamp_nsec"] = v
799 # logger().debug('***** timestamp nanos, v=%d' % (v))
800 # logger().debug('***** timestamp to millis, v=%d' % (vv))
801
802 if fm.is_timestamp or fm.is_timestamp_chrome:
803 # logger().debug('***** timestamp or timestamp_chrome, v=%d' % (v))
804 if v < 0:
805 # logger().debug('***** adding invalid timestamp')
806 v = 0
807 GulpDocument.add_invalid_timestamp(dest_fm.result)
808 if k is not None:
809 # also add to mapped key
810 dest_fm.result[k] = v
811 else:
812 # not a timestamp, map
813 if k is None:
814 # add unmapped
815 k = self.get_unmapped_field_name(source_key)
816 else:
817 v = self._type_checks(v, k, index_type_mapping)
818 dest_fm.result[k] = v
819
820 fme_list.append(dest_fm)
821 """
822 logger().debug('FME LIST FOR THIS RECORD:')
823 for p in fme_list:
824 logger().debug(p)
825 logger().debug('---------------------------------')
826 """
827 return fme_list
828
830 self, docs: list[dict], flt: GulpIngestionFilter = None
831 ) -> list[dict]:
832 """
833 Builds the ingestion chunk for the websocket, filtering if needed.
834 """
835 # logger().debug("building ingestion chunk, flt=%s" % (flt))
836 if not docs:
837 return []
838
839 ws_docs = [
840 {
841 "_id": doc["_id"],
842 "@timestamp": doc["@timestamp"],
843 "gulp.source.file": doc["gulp.source.file"],
844 "event.duration": doc["event.duration"],
845 "gulp.context": doc["gulp.context"],
846 "gulp.log.level": doc.get("gulp.log.level", int(GulpLogLevel.INFO)),
847 "event.category": doc.get("event.category", None),
848 "event.code": doc["event.code"],
849 "gulp.event.code": doc["gulp.event.code"],
850 }
851 for doc in docs
852 if elastic_api.filter_doc_for_ingestion(
853 doc, flt, ignore_store_all_documents=True
854 )
855 == GulpEventFilterResult.ACCEPT
856 ]
857
858 return ws_docs
859
860 async def _flush_buffer(
861 self,
862 index: str,
863 fs: TmpIngestStats,
864 ws_id: str,
865 req_id: str,
866 flt: GulpIngestionFilter = None,
867 wait_for_refresh: bool = False,
868 ) -> TmpIngestStats:
869 """
870 NOTE: errors appended by this function are intended as INGESTION errors:
871 it means something wrong with the format of the event, and must be fixed ASAP if this happens.
872 ideally, function should NEVER append errors and the errors total should be the same before and after this function returns (this function may only change the skipped total, which means some duplicates were found).
873 """
874 if len(self.buffer) == 0:
875 # already flushed
876 return fs
877
878 # logger().debug('flushing ingestion buffer, len=%d' % (len(self.buffer)))
879
880 skipped, failed, failed_ar, ingested_docs = await elastic_api.ingest_bulk(
881 elastic_api.elastic(),
882 index,
883 self.buffer,
884 flt=flt,
885 wait_for_refresh=wait_for_refresh,
886 )
887 # print(json.dumps(ingested_docs, indent=2))
888
889 if failed > 0:
890 if config.debug_abort_on_elasticsearch_ingestion_error():
891 raise Exception(
892 "elasticsearch ingestion errors means GulpDocument contains invalid data, review errors on collab db!"
893 )
894
895 self.buffer = []
896
897 # build ingestion chunk
898 ws_docs = self._build_ingestion_chunk_for_ws(ingested_docs, flt)
899
900 # send ingested docs to websocket
901 if len(ws_docs) > 0:
902 ws_api.shared_queue_add_data(
903 WsQueueDataType.INGESTION_CHUNK,
904 req_id,
905 {"plugin": self.name(), "events": ws_docs},
906 ws_id=ws_id,
907 )
908
909 # update stats
910 fs = fs.update(
911 failed=failed,
912 skipped=skipped,
913 ingest_errors=failed_ar,
914 )
915 return fs
916
917 async def _ingest_record(
918 self,
919 index: str,
920 doc: GulpDocument | dict,
921 fs: TmpIngestStats,
922 ws_id: str,
923 req_id: str,
924 flt: GulpIngestionFilter = None,
925 flush_enabled: bool = True,
926 **kwargs,
927 ) -> TmpIngestStats:
928 """
929 bufferize as much as ingestion_buffer_size, then flush (writes to elasticsearch)
930 """
931 ingestion_buffer_size = config.config().get("ingestion_buffer_size", 1000)
932 self.buffer.append(doc)
933 if len(self.buffer) >= ingestion_buffer_size and flush_enabled:
934 # time to flush
935 fs = await self._flush_buffer(index, fs, ws_id, req_id, flt)
936
937 return fs
938
940 self, fs: TmpIngestStats, source: str | dict, ex: Exception | str
941 ) -> TmpIngestStats:
942 """
943 whole source failed ingestion (error happened before the record parsing loop), helper to update stats
944 """
945 logger().exception(
946 "PARSER FAILED: source=%s, ex=%s"
947 % (muty.string.make_shorter(str(source), 260), ex)
948 )
949 fs = fs.update(ingest_errors=[ex], parser_failed=1)
950 return fs
951
953 self, fs: TmpIngestStats, entry: any, source: str | dict, ex: Exception | str
954 ) -> TmpIngestStats:
955 """
956 record failed ingestion (in the record parser loop), helper to update stats
957
958 Args:
959 fs (TmpIngestStats): The temporary ingestion statistics.
960 entry (any): The entry that failed.
961 source (str): The source of the record.
962 ex (Exception|str): The exception that caused the failure.
963 Returns:
964 TmpIngestStats: The updated temporary ingestion statistics.
965 """
966 # logger().exception("RECORD FAILED: source=%s, record=%s, ex=%s" % (muty.string.make_shorter(str(source),260), muty.string.make_shorter(str(entry),260), ex))
967 fs = fs.update(failed=1, ingest_errors=[ex])
968 return fs
969
971 self,
972 index: str,
973 source: str | dict,
974 req_id: str,
975 client_id: int,
976 ws_id: str,
977 fs: TmpIngestStats,
978 flt: GulpIngestionFilter = None,
979 ) -> GulpRequestStatus:
980 """
981 to be called whenever ingest() must exit: flushes the buffer and updates the ingestion stats
982 """
983 try:
984 # finally flush ingestion buffer
985 fs = await self._flush_buffer(index, fs, ws_id, req_id, flt)
986 logger().info(
987 "INGESTION DONE FOR source=%s,\n\tclient_id=%d (processed(ingested)=%d, failed=%d, skipped=%d, errors=%d, parser_errors=%d)"
988 % (
989 muty.string.make_shorter(str(source), 260),
990 client_id,
991 fs.ev_processed,
992 fs.ev_failed,
993 fs.ev_skipped,
994 len(fs.ingest_errors),
995 fs.parser_failed,
996 )
997 )
998 except Exception as ex:
999 fs = fs.update(ingest_errors=[ex])
1000 logger().exception(
1001 "FAILED finalizing ingestion for source=%s"
1002 % (muty.string.make_shorter(str(source), 260))
1003 )
1004
1005 finally:
1006 status, _ = await GulpStats.update(
1007 await collab_api.collab(),
1008 req_id,
1009 ws_id,
1010 fs=fs,
1011 force=True,
1012 file_done=True,
1013 )
1014 return status
1015
1016
1018 plugin: str, plugin_type: GulpPluginType = GulpPluginType.INGESTION
1019) -> str:
1020
1021 # try plain .py first
1022 # TODO: on license manager, disable plain .py load (only encrypted pyc)
1023 # get path according to plugin type
1024 path_plugins = config.path_plugins(plugin_type)
1025
1026 ppy = muty.file.safe_path_join(path_plugins, plugin + ".py")
1027 if not muty.file.exists(ppy):
1028 # try pyc
1029 ppyc = muty.file.safe_path_join(path_plugins, plugin + ".pyc")
1030 if not muty.file.exists(ppyc):
1031 raise ObjectNotFound(
1032 "plugin %s not found (tried %s, %s)" % (plugin, ppy, ppyc)
1033 )
1034 return ppyc
1035 return ppy
1036
1037
1039 plugin: str,
1040 plugin_type: GulpPluginType = GulpPluginType.INGESTION,
1041 ignore_cache: bool = False,
1042 **kwargs,
1043) -> PluginBase:
1044 """
1045 Load a plugin from a given path or from the default plugin path.
1046
1047 Args:
1048 plugin (str): The name or path of the plugin to load.
1049 plugin_type (GulpPluginType, optional): The type of the plugin to load. Defaults to GulpPluginType.INGESTION.
1050 this is ignored if the plugin is an absolute path or if "plugin_cache" is enabled and the plugin is already cached.
1051 ignore_cache (bool, optional): Whether to ignore the plugin cache. Defaults to False.
1052 **kwargs (dict, optional): Additional keyword arguments.
1053 Returns:
1054 PluginBase: The loaded plugin.
1055
1056 Raises:
1057 Exception: If the plugin could not be loaded.
1058 """
1059 logger().debug("load_plugin %s ..." % (plugin))
1060
1061 if not '/' in plugin and (plugin.lower().endswith(".py") or plugin.lower().endswith(".pyc")):
1062 # remove extension
1063 plugin = plugin.rsplit(".", 1)[0]
1064
1065 m = plugin_cache_get(plugin)
1066 if ignore_cache:
1067 logger().debug("ignoring cache for plugin %s" % (plugin))
1068 m = None
1069
1070 if m is not None:
1071 return m.Plugin(plugin, **kwargs)
1072
1073 if "/" in plugin:
1074 # plugins is an absolute path
1075 path = muty.file.abspath(plugin)
1076 else:
1077 # uses plugin_type to load from the correct subfolder
1078 path = _get_plugin_path(plugin, plugin_type=plugin_type)
1079
1080 try:
1081 m: PluginBase = muty.dynload.load_dynamic_module_from_file(path)
1082 except Exception as ex:
1083 raise Exception(
1084 "load_dynamic_module_from_file() failed, could not load plugin %s !"
1085 % (path)
1086 ) from ex
1087
1088 mod: PluginBase = m.Plugin(path, **kwargs)
1089 logger().debug("loaded plugin: %s" % (mod.name()))
1090 plugin_cache_add(m, plugin)
1091
1092 return mod
1093
1094
1095async def list_plugins() -> list[dict]:
1096 """
1097 List all available plugins.
1098
1099 Returns:
1100 list[dict]: The list of available plugins.
1101 """
1102 path_plugins = config.path_plugins()
1103 files = await muty.file.list_directory_async(path_plugins, "*.py*", recursive=True)
1104 l = []
1105 for f in files:
1106 if "__init__" not in f and "__pycache__" not in f:
1107 try:
1108 p = load_plugin(f)
1109 n = {
1110 "display_name": p.name(),
1111 "type": str(p.type()),
1112 "desc": p.desc(),
1113 "filename": os.path.basename(p.path),
1114 "internal": p.internal(),
1115 "options": [o.to_dict() for o in p.options()],
1116 "depends_on": p.depends_on(),
1117 "tags": p.tags(),
1118 "event_type_field": p.event_type_field(),
1119 "version": p.version(),
1120 }
1121 l.append(n)
1122 unload_plugin(p)
1123 except Exception as ex:
1124 logger().exception(ex)
1125 logger().error("could not load plugin %s" % (f))
1126 continue
1127
1128 return l
1129
1130
1131async def get_plugin_tags(plugin: str) -> list[str]:
1132 """
1133 Get the tags for a given (ingestion) plugin.
1134
1135 Args:
1136 plugin (str): The name of the plugin to get the tags for.
1137 Returns:
1138 list[str]: The tags for the given plugin.
1139 """
1140 p = load_plugin(plugin)
1141 tags = p.tags()
1142 return tags
1143
1144
1145def unload_plugin(mod: PluginBase) -> None:
1146 """
1147 Unloads a plugin module by calling its `unload` method and deletes the module object
1148
1149 NOTE: mod is **no more valid** after this function returns.
1150
1151 Args:
1152 mod (PluginBase): The plugin module to unload.
1153 run_gc (bool): if set, garbage collector is called after unloading the module. Defaults to True.
1154
1155 Returns:
1156 None
1157 """
1158 if config.plugin_cache_enabled():
1159 return
1160
1161 if mod is not None:
1162 # delete from cache if any
1163 # plugin_cache_delete(mod)
1164
1165 logger().debug("unloading plugin: %s" % (mod.name()))
1166 mod.cleanup()
1167 del mod
1168
1169
1171 """
1172 Clear the process's own plugin cache.
1173
1174 Returns:
1175 None
1176 """
1177 global _cache
1178 if not config.plugin_cache_enabled():
1179 return
1180
1181 _cache = {}
1182
1183
1184def plugin_cache_remove(plugin: str) -> None:
1185 """
1186 Remove a plugin from the process's own plugin cache.
1187
1188 Args:
1189 plugin (str): The name/path of the plugin to remove from the cache.
1190
1191 Returns:
1192 None
1193 """
1194 global _cache
1195 if not config.plugin_cache_enabled():
1196 return
1197
1198 if plugin in _cache:
1199 logger().debug("removing plugin %s from cache" % (plugin))
1200
1201 # cleanup module and delete
1202 m = _cache[plugin]
1203 del _cache[plugin]
1204
1205
1206def plugin_cache_add(m: ModuleType, name: str) -> None:
1207 """
1208 Add a plugin to the process's own plugin cache.
1209
1210 Args:
1211 m (ModuleType): The plugin module to add to the cache.
1212 name (str): The name/path of the plugin.
1213
1214 Returns:
1215 None
1216 """
1217 global _cache
1218 if not config.plugin_cache_enabled():
1219 return
1220
1221 mm = _cache.get(name, None)
1222 if mm is None:
1223 logger().debug("adding plugin %s (%s) to cache" % (name, m))
1224 _cache[name] = m
1225
1226
1227def plugin_cache_get(plugin: str) -> ModuleType:
1228 """
1229 Retrieve a plugin from the process's own plugin cache.
1230
1231 Args:
1232 plugin (str): The name/path of the plugin to retrieve.
1233
1234 Returns:
1235 ModuleType: The plugin module if found in the cache, otherwise None.
1236 """
1237 global _cache
1238 if not config.plugin_cache_enabled():
1239 return None
1240
1241 p = _cache.get(plugin, None)
1242 if p is not None:
1243 logger().debug("found plugin %s in cache" % (plugin))
1244 else:
1245 logger().warning("plugin %s not found in cache" % (plugin))
1246 return p
list[str] depends_on(self)
Definition plugin.py:113
tuple[int, GulpRequestStatus] query(self, int operation_id, int client_id, int user_id, str username, str ws_id, str req_id, GulpPluginParams plugin_params, GulpQueryFilter flt, GulpQueryOptions options=None)
Definition plugin.py:130
ProcessingPipeline sigma_plugin_initialize(self, ProcessingPipeline pipeline=None, str mapping_file=None, str mapping_id=None, str product=None, GulpPluginParams plugin_params=None)
Definition plugin.py:206
GulpRequestStatus _finish_ingestion(self, str index, str|dict source, str req_id, int client_id, str ws_id, TmpIngestStats fs, GulpIngestionFilter flt=None)
Definition plugin.py:979
list[dict] _build_ingestion_chunk_for_ws(self, list[dict] docs, GulpIngestionFilter flt=None)
Definition plugin.py:831
TmpIngestStats _record_failed(self, TmpIngestStats fs, any entry, str|dict source, Exception|str ex)
Definition plugin.py:954
str version(self)
Definition plugin.py:90
TmpIngestStats _parser_failed(self, TmpIngestStats fs, str|dict source, Exception|str ex)
Definition plugin.py:941
list[GulpDocument] _build_gulpdocuments(self, list[FieldMappingEntry] fme, int idx, int operation_id, str context, str plugin, int client_id, str raw_event, str original_id, str src_file, int timestamp=None, int timestamp_nsec=None, str event_code=None, list[str] cat=None, int duration_nsec=0, GulpLogLevel gulp_log_level=None, str original_log_level=None, bool remove_raw_event=False, **kwargs)
Definition plugin.py:578
list[GulpDocument] _call_record_to_gulp_document_funcs(self, int operation_id, int client_id, str context, str source, TmpIngestStats fs, any record, int record_idx, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, Callable record_to_gulp_document_fun=None, **kwargs)
Definition plugin.py:394
list[GulpDocument] record_to_gulp_document(self, int operation_id, int client_id, str context, str source, TmpIngestStats fs, any record, int record_idx, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, **kwargs)
Definition plugin.py:472
TmpIngestStats _flush_buffer(self, str index, TmpIngestStats fs, str ws_id, str req_id, GulpIngestionFilter flt=None, bool wait_for_refresh=False)
Definition plugin.py:868
GulpPluginType type(self)
Definition plugin.py:84
list[FieldMappingEntry] _map_source_key(self, GulpPluginParams plugin_params, GulpMapping custom_mapping, str source_key, Any v, dict index_type_mapping=None, bool ignore_custom_mapping=False, **kwargs)
Definition plugin.py:695
tuple[dict, GulpMapping] ingest_plugin_initialize(self, str index, str|dict source, bool skip_mapping=False, ProcessingPipeline pipeline=None, str mapping_file=None, str mapping_id=None, GulpPluginParams plugin_params=None)
Definition plugin.py:328
None __init__(self, str path, **kwargs)
Definition plugin.py:55
None cleanup(self)
Definition plugin.py:552
GulpRequestStatus ingest(self, str index, str req_id, int client_id, int operation_id, str context, str|list[dict] source, str ws_id, GulpPluginParams plugin_params=None, GulpIngestionFilter flt=None, **kwargs)
Definition plugin.py:511
tuple[TmpIngestStats, bool] _process_record(self, str index, any record, int record_idx, Callable my_record_to_gulp_document_fun, str ws_id, str req_id, int operation_id, int client_id, str context, str source, TmpIngestStats fs, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, GulpIngestionFilter flt=None, **kwargs)
Definition plugin.py:262
tuple[GulpMapping, GulpPluginParams] _process_plugin_params(self, GulpMapping custom_mapping, GulpPluginParams plugin_params=None)
Definition plugin.py:171
str get_unmapped_field_name(self, str field)
Definition plugin.py:626
bool internal(self)
Definition plugin.py:150
ProcessingPipeline pipeline(self, GulpPluginParams plugin_params=None, **kwargs)
Definition plugin.py:540
list[str] tags(self)
Definition plugin.py:156
any _type_checks(self, any v, str k, dict index_type_mapping)
Definition plugin.py:641
str event_type_field(self)
Definition plugin.py:107
TmpIngestStats _ingest_record(self, str index, GulpDocument|dict doc, TmpIngestStats fs, str ws_id, str req_id, GulpIngestionFilter flt=None, bool flush_enabled=True, **kwargs)
Definition plugin.py:927
list[GulpPluginOption] options(self)
Definition plugin.py:77
list[str] get_plugin_tags(str plugin)
Definition plugin.py:1131
None unload_plugin(PluginBase mod)
Definition plugin.py:1145
None plugin_cache_add(ModuleType m, str name)
Definition plugin.py:1206
None plugin_cache_remove(str plugin)
Definition plugin.py:1184
list[dict] list_plugins()
Definition plugin.py:1095
None plugin_cache_clear()
Definition plugin.py:1170
ModuleType plugin_cache_get(str plugin)
Definition plugin.py:1227
str _get_plugin_path(str plugin, GulpPluginType plugin_type=GulpPluginType.INGESTION)
Definition plugin.py:1019
PluginBase load_plugin(str plugin, GulpPluginType plugin_type=GulpPluginType.INGESTION, bool ignore_cache=False, **kwargs)
Definition plugin.py:1043