(g)ULP!
Loading...
Searching...
No Matches
plugin.py
Go to the documentation of this file.
1"""Gulp plugin base class and plugin utilities.
2"""
3
4import ipaddress
5import logging
6import os
7from abc import ABC, abstractmethod
8from copy import deepcopy
9from types import ModuleType
10from typing import Any, Callable
11
12import muty.crypto
13import muty.dynload
14import muty.file
15import muty.jsend
16import muty.string
17import muty.time
18from opensearchpy import AsyncOpenSearch as AsyncElasticsearch
19from sigma.processing.pipeline import ProcessingPipeline
20from sqlalchemy.ext.asyncio import AsyncEngine
21
22import gulp.api.elastic_api as elastic_api
23import gulp.api.mapping.helpers as mappings_helper
24import gulp.api.rest.ws as ws_api
25import gulp.config as config
26import gulp.utils as gulp_utils
27from gulp.api.collab.base import GulpRequestStatus
28from gulp.api.collab.stats import GulpStats, TmpIngestStats
29from gulp.api.elastic.structs import GulpDocument, GulpIngestionFilter
30from gulp.api.mapping.models import FieldMappingEntry, GulpMapping, GulpMappingOptions
31from gulp.api.rest.ws import WsQueueDataType
32from gulp.defs import (
33 GulpEventFilterResult,
34 GulpLogLevel,
35 GulpPluginType,
36 InvalidArgument,
37 ObjectNotFound,
38)
39from gulp.plugin_internal import GulpPluginOption, GulpPluginParams
40
41_logger: logging.Logger = None
42
43# caches plugin modules for the running process
44_cache: dict = {}
45
46
47def init(logger: logging.Logger):
48 """
49 Initializes the plugin base module.
50 """
51 global _logger
52 _logger = logger
53
54
55class PluginBase(ABC):
56 """
57 Base class for all Gulp plugins.
58 """
59
61 self,
62 path: str,
63 collab: AsyncEngine = None,
64 elastic: AsyncElasticsearch = None,
65 **kwargs,
66 ) -> None:
67 """
68 Initializes a new instance of the PluginBase class.
69
70 Args:
71 path (str): The path to the plugin.
72 collab: instance of sqlalchemy client
73 elastic: instance of elasticsearch client
74 kwargs: additional arguments if any
75 """
76 self.req_id: str = None
77 self.index: str = None
78 self.client_id: str = None
79 self.operation_id: str = None
80 self.context: str = (None,)
81 self.ws_id: str = None
82 self.path = path
83 self.collab = collab
84 self.elastic = elastic
85
86 self.buffer: list[GulpDocument] = []
87 for k, v in kwargs.items():
88 self.__dict__[k] = v
89
90 # initialize mapping helper
91 mappings_helper.init(_logger)
92 super().__init__()
93
94 @classmethod
95 def logger(cls) -> logging.Logger:
96 global _logger
97 return _logger
98
99 def options(self) -> list[GulpPluginOption]:
100 """
101 return available GulpPluginOption list (plugin specific parameters)
102 """
103 return []
104
105 @abstractmethod
106 def type(self) -> GulpPluginType:
107 """
108 Returns the plugin type.
109 """
110
111 @abstractmethod
112 def version(self) -> str:
113 """
114 Returns plugin version.
115 """
116
117 @abstractmethod
118 def desc(self) -> str:
119 """
120 Returns a description of the plugin.
121 """
122
123 @abstractmethod
124 def name(self) -> str:
125 """
126 Returns the name of the plugin.
127 """
128
129 def event_type_field(self) -> str:
130 """
131 Returns the field name for the event type.
132 """
133 return "event.code"
134
135 def depends_on(self) -> list[str]:
136 """
137 Returns a list of plugins this plugin depends on.
138 """
139 return []
140
141 def run_command(self, p: GulpPluginParams) -> dict:
142 """
143 Runs a custom command
144 """
145 return None
146
147 def internal(self) -> bool:
148 """
149 Returns whether the plugin is for internal use only
150 """
151 return False
152
153 def tags(self) -> list[str]:
154 """
155 returns a list of tags for the plugin. Tags are used to aid filtering of plugins/query filters in the UI.
156 - "event"
157 - "network"
158 - "file"
159 - "process"
160 - "threat"
161 - "threat.enrichments"
162 - ...
163 """
164 return []
165
167 self, custom_mapping: GulpMapping, plugin_params: GulpPluginParams = None
168 ) -> tuple[GulpMapping, GulpPluginParams]:
169 """
170 Process the plugin parameters checking parameters as `timestamp_field`, ... and update the custom mapping accordingly.
171
172 Args:
173 custom_mapping (GulpMapping): The custom mapping provided: if it is not empty, it will be used as is (plugin_params will be ignored)
174 plugin_params (GulpPluginParams, optional): The plugin parameters. Defaults to None.
175
176 Returns:
177 tuple[GulpMapping, GulpPluginParams]: The updated custom mapping and plugin parameters.
178 """
179 if plugin_params is None:
180 plugin_params = GulpPluginParams()
181
182 if len(custom_mapping.fields) > 0:
183 # custom_mapping provided
184 return custom_mapping, plugin_params
185
186 self.logger().warning("no custom mapping provided")
187
188 tf = plugin_params.timestamp_field
189 if tf is not None:
190 self.logger().debug("using timestamp_field=%s" % (tf))
191 # build a proper custom_mapping with just timestamp
192 custom_mapping.fields[tf] = FieldMappingEntry(is_timestamp=True)
193 # self.logger().debug(custom_mapping)
194 return custom_mapping, plugin_params
195
197 self,
198 pipeline: ProcessingPipeline = None,
199 mapping_file: str = None,
200 mapping_id: str = None,
201 product: str = None,
202 plugin_params: GulpPluginParams = None,
203 ) -> ProcessingPipeline:
204 """
205 Initializes the Sigma plugin to convert sigma rules YAML to elasticsearch DSL query.
206
207 Args:
208 pipeline (ProcessingPipeline, optional): The processing pipeline. Defaults to None (empty pipeline, plugin must fill it).
209 mapping_file (str, optional): The name of the mapping file (i.e. 'windows.json') in the gulp/mapping_files directory. Defaults to None (use pipeline mapping only).
210 mapping_id (str, optional): The mapping ID (["options"]["mapping_id"] in the mapping file, to get a specific mapping). Defaults to None (use first).
211 product (str, optional): The product. Defaults to None.
212 plugin_params (GulpPluginParams, optional): The plugin parameters, to override i.e. mapping_file_path, mapping_id, ... Defaults to None.
213 Returns:
214 ProcessingPipeline: The initialized processing pipeline.
215 """
216 self.logger().debug("INITIALIZING SIGMA plugin=%s" % (self.name()))
217
218 mapping_file_path = None
219 if mapping_file is not None:
220 # get path in mapping directory
221 mapping_file_path = gulp_utils.build_mapping_file_path(mapping_file)
222
223 if plugin_params is not None:
224 # override with plugin_params
225 if plugin_params.mapping_file is not None:
226 mapping_file_path = gulp_utils.build_mapping_file_path(
227 plugin_params.mapping_file
228 )
229 if plugin_params.mapping_id is not None:
230 mapping_id = plugin_params.mapping_id
231
232 p = await mappings_helper.get_enriched_pipeline(
233 pipeline=pipeline,
234 mapping_file_path=mapping_file_path,
235 mapping_id=mapping_id,
236 product=product,
237 )
238 return p
239
241 self,
242 index: str,
243 record: any,
244 record_idx: int,
245 my_record_to_gulp_document_fun: Callable,
246 ws_id: str,
247 req_id: str,
248 operation_id: int,
249 client_id: int,
250 context: str,
251 source: str,
252 fs: TmpIngestStats,
253 custom_mapping: GulpMapping = None,
254 index_type_mapping: dict = None,
255 plugin: str = None,
256 plugin_params: GulpPluginParams = None,
257 flt: GulpIngestionFilter = None,
258 **kwargs,
259 ) -> tuple[TmpIngestStats, bool]:
260 """
261 Process a record for ingestion, updating ingestion stats.
262 Args:
263 index (str): The index to ingest the record into.
264 record (any): The record to process.
265 record_idx (int): The index of the record as in the source.
266 my_record_to_gulp_document_fun (Callable): The function (for this plugin) to convert the record to one or more GulpDocument/s.
267 ws_id (str): The websocket ID
268 req_id (str): The request ID.
269 operation_id (int): The operation ID.
270 client_id (int): The client ID.
271 context (str): The context of the record.
272 source (str): The source of the record.
273 fs (TmpIngestStats): The temporary ingestion statistics.
274 custom_mapping (GulpMapping, optional): The custom mapping for the record. Defaults to None.
275 index_type_mapping (dict, optional): The elastic index type mapping. Defaults to None.
276 plugin (str, optional): The plugin name. Defaults to None.
277 plugin_params (GulpPluginParams, optional): The plugin parameters. Defaults to None.
278 flt (GulpIngestionFilter, optional): The ingestion filter. Defaults to None.
279 **kwargs: Additional keyword arguments.
280 Returns:
281 tuple[TmpIngestStats, bool]: A tuple containing the updated temporary ingestion statistics and a flag indicating whether to break the ingestion process.
282 """
283
284 # convert record to one or more GulpDocument objects
285 docs = await self._call_record_to_gulp_document_funcs(
286 operation_id=operation_id,
287 client_id=client_id,
288 context=context,
289 source=source,
290 fs=fs,
291 record=record,
292 record_idx=record_idx,
293 custom_mapping=custom_mapping,
294 index_type_mapping=index_type_mapping,
295 plugin=plugin,
296 plugin_params=plugin_params,
297 record_to_gulp_document_fun=my_record_to_gulp_document_fun,
298 **kwargs,
299 )
300 # ingest record
301 for d in docs:
302 fs = await self._ingest_record(index, d, fs, ws_id, req_id, flt, **kwargs)
303
304 status, _ = await GulpStats.update(
305 self.collab,
306 req_id,
307 ws_id,
308 fs=fs.update(processed=len(docs)),
309 )
310 must_break = False
311 if status in [GulpRequestStatus.FAILED, GulpRequestStatus.CANCELED]:
312 must_break = True
313
314 return fs, must_break
315
317 self,
318 index: str,
319 source: str | dict,
320 skip_mapping: bool = False,
321 pipeline: ProcessingPipeline = None,
322 mapping_file: str = None,
323 mapping_id: str = None,
324 plugin_params: GulpPluginParams = None,
325 ) -> tuple[dict, GulpMapping]:
326 """
327 Initializes the ingestion plugin.
328
329 Args:
330 index (str): The name of the elasticsearch index.
331 source (str|dict): The source of the record (source file name or path, usually. may also be a dictionary.).
332 skip_mapping (bool, optional): Whether to skip mapping initialization (just prints source and plugin name, and return empty index mapping and custom mapping). Defaults to False.
333 pipeline (ProcessingPipeline, optional): The psyigma pipeline to borrow the mapping from, if any. Defaults to None (use mapping file only).
334 mapping_file (str, optional): name of the mapping file (i.e. 'windows.json') in the gulp/mapping_files directory. Defaults to None (use pipeline mapping only).
335 mapping_id (str, optional): The mapping ID (options.mapping_id) in the mapping file, to get a specific mapping. Defaults to None (use first).
336 plugin_params (GulpPluginParams, optional): The plugin parameters (i.e. to override mapping_file, mapping_id, ...). Defaults to None.
337 Returns:
338 tuple[dict, GulpMapping]: A tuple containing the elasticsearch index type mappings and the enriched GulpMapping (or an empty GulpMapping is no valid pipeline and/or mapping_file are provided).
339
340 """
341 # self.logger().debug("ingest_plugin_initialize: index=%s, pipeline=%s, mapping_file=%s, mapping_id=%s, plugin_params=%s" % (index, pipeline, mapping_file, mapping_id, plugin_params))
342 self.logger().debug(
343 "INITIALIZING INGESTION for source=%s, plugin=%s"
344 % (muty.string.make_shorter(source, 260), self.name())
345 )
346 if skip_mapping:
347 return {}, GulpMapping()
348
349 # get path of the mapping file in gulp/mapping_files folder
350 mapping_file_path = None
351 if mapping_file is not None:
352 mapping_file_path = gulp_utils.build_mapping_file_path(mapping_file)
353
354 if plugin_params is not None:
355 # override with plugin_params
356 if plugin_params.mapping_file is not None:
357 mapping_file_path = gulp_utils.build_mapping_file_path(
358 plugin_params.mapping_file
359 )
360 if plugin_params.mapping_id is not None:
361 mapping_id = plugin_params.mapping_id
362 if plugin_params.pipeline is not None:
363 pipeline = plugin_params.pipeline
364
365 index_type_mappings = await elastic_api.index_get_mapping(
366 elastic_api.elastic(), index, False
367 )
368 # index_type_mappings = await elastic_api.datastream_get_mapping(self.elastic, index + '-template')
369 m: GulpMapping = await mappings_helper.get_enriched_mapping_for_ingestion(
370 pipeline=pipeline,
371 mapping_file_path=mapping_file_path,
372 mapping_id=mapping_id,
373 )
374 return index_type_mappings, m
375
377 self,
378 operation_id: int,
379 client_id: int,
380 context: str,
381 source: str,
382 fs: TmpIngestStats,
383 record: any,
384 record_idx: int,
385 custom_mapping: GulpMapping = None,
386 index_type_mapping: dict = None,
387 plugin: str = None,
388 plugin_params: GulpPluginParams = None,
389 record_to_gulp_document_fun: Callable = None,
390 **kwargs,
391 ) -> list[GulpDocument]:
392 """Stub function to call stacked plugins record_to_document_gulp_document.
393 Each function is called with the previously returned GulpDocument.
394
395 Args:
396 operation_id (int): the operation ID associated with the record
397 client_id (int): client ID performing the ingestion
398 context (str): context associated with the record
399 source (str): source of the record (source file name or path, usually)
400 fs (TmpIngestStats): _description_
401 record (any): a single record (first time) or a list of GulpDocument objects (in stacked plugins)
402 record_idx (int): The index of the record in source.
403 custom_mapping (GulpMapping, optional): The custom mapping to use for the conversion. Defaults to None.
404 index_type_mapping (dict, optional): elastic search index type mappings { "field": "type", ... }. Defaults to None.
405 plugin (str, optional): "agent.type" to be set in the GulpDocument. Defaults to None.
406 plugin_params (GulpPluginParams, optional): The plugin parameters to use, if any. Defaults to None.
407 record_to_gulp_document_fun (Callable, optional): function to parse record into a gulp document, if stacked this receives a list of GulpDocuments
408
409 Returns:
410 list[GulpDocument]: zero or more GulpDocument objects
411 """
412 # plugin_params=deepcopy(plugin_params)
413
414 if plugin_params is None:
415 plugin_params = GulpPluginParams()
416
417 docs = record
418
419 if record_to_gulp_document_fun is not None:
420 docs = await record_to_gulp_document_fun(
421 operation_id,
422 client_id,
423 context,
424 source,
425 fs,
426 record,
427 record_idx,
428 custom_mapping,
429 index_type_mapping,
430 plugin,
431 plugin_params,
432 **kwargs,
433 )
434
435 for fun in plugin_params.record_to_gulp_document_fun:
436 docs = await fun(
437 operation_id,
438 client_id,
439 context,
440 source,
441 fs,
442 docs,
443 record_idx,
444 custom_mapping,
445 index_type_mapping,
446 plugin,
447 plugin_params,
448 **kwargs,
449 )
450
451 if docs is None:
452 return []
453 return docs
454
456 self,
457 operation_id: int,
458 client_id: int,
459 context: str,
460 source: str,
461 fs: TmpIngestStats,
462 record: any,
463 record_idx: int,
464 custom_mapping: GulpMapping = None,
465 index_type_mapping: dict = None,
466 plugin: str = None,
467 plugin_params: GulpPluginParams = None,
468 **kwargs,
469 ) -> list[GulpDocument]:
470 """
471 Converts a record to one or more GulpDocument objects based on the provided index mappings.
472
473 Args:
474 operation_id (int): The operation ID associated with the record.
475 client_id (int): The client ID associated with the record.
476 context (str): The context associated with the record.
477 source (str): The source of the record (source file name or path, usually).
478 fs (TmpIngestStats): The temporary ingestion statistics (may be updated on return).
479 record (any): record to convert, plugin dependent format: note that here stacked plugins receives a list of GulpDocument objects instead (since the original record may generate one or more documents).
480 record_idx (int): The index of the record in source.
481 custom_mapping (GulpMapping, optional): The custom mapping to use for the conversion. Defaults to None.
482 index_type_mapping (dict, optional): elastic search index type mappings { "ecs_field": "type", ... }. Defaults to None.
483 plugin (str, optional): "agent.type" to be set in the GulpDocument. Defaults to None.
484 plugin_params (GulpPluginParams, optional): The plugin parameters to use, if any. Defaults to None.
485 extra (dict, optional): Additional fields to add to the GulpDocument (after applying mapping). Defaults to None.
486 **kwargs: Additional keyword arguments:
487
488 Returns:
489 list[GulDocument]: The converted GulpDocument objects or None if an exception occurred (fs is updated then).
490
491 Raises:
492 NotImplementedError: This method is not implemented yet.
493 """
494 raise NotImplementedError("not implemented!")
495
496 async def ingest(
497 self,
498 index: str,
499 req_id: str,
500 client_id: int,
501 operation_id: int,
502 context: str,
503 source: str | list[dict],
504 ws_id: str,
505 plugin_params: GulpPluginParams = None,
506 flt: GulpIngestionFilter = None,
507 **kwargs,
508 ) -> GulpRequestStatus:
509 """
510 Ingests a file using the plugin.
511
512 NOTE: implementers should call super().ingest() in their implementation.
513 NOTE: this function *SHOULD NOT* raise exceptions
514
515 Args:
516 index (str): name of the elasticsearch index to ingest the document into.
517 req_id (str): The request ID related to this ingestion (must exist on the collab db).
518 client_id (int): The client ID performing the ingestion.
519 operation_id (int): The operation ID related to this ingestion.
520 context (str): Context related to this ingestion.
521 source (str|list[dict]): The path to the file to ingest, or a list of events dicts.
522 ws_id (str): The websocket ID
523 plugin_params (GulpPluginParams): additional parameters to pass to the ingestion function. Defaults to None.
524 flt (GulpIngestionFilter, optional): filter to apply to this ingestion, if any. Defaults to None.
525 kwargs: additional arguments if any
526 """
527 self.req_id = req_id
528 self.client_id = client_id
529 self.operation_id = operation_id
530 self.context = context
531 self.ws_id = ws_id
532 self.index = index
533 # raise NotImplementedError("not implemented!")
534
535 async def pipeline(
536 self, plugin_params: GulpPluginParams = None, **kwargs
537 ) -> ProcessingPipeline:
538 """
539 Returns the pysigma processing pipeline for the plugin, if any.
540
541 Args:
542 plugin_params (GulpPluginParams, optional): additional parameters to pass to the pipeline. Defaults to None.
543 kwargs: additional arguments if any.
544 Returns:
545 ProcessingPipeline: The processing pipeline.
546 """
547 raise NotImplementedError("not implemented!")
548
549 def cleanup(self) -> None:
550 """
551 Optional cleanup routine to call on unload.
552 """
553 return
554
556 self,
557 fme: list[FieldMappingEntry],
558 idx: int,
559 operation_id: int,
560 context: str,
561 plugin: str,
562 client_id: int,
563 raw_event: str,
564 original_id: str,
565 src_file: str,
566 timestamp: int = None,
567 timestamp_nsec: int = None,
568 event_code: str = None,
569 cat: list[str] = None,
570 duration_nsec: int = 0,
571 gulp_log_level: GulpLogLevel = None,
572 original_log_level: str = None,
573 remove_raw_event: bool = False,
574 **kwargs,
575 ) -> list[GulpDocument]:
576 """
577 build one or more GulpDocument objects from a list of FieldMappingEntry objects:
578
579 this function creates as many GulpDocument objects as there are FieldMappingEntry objects with is_timestamp=True.
580 if no FieldMappingEntry object has is_timestamp=True, it creates a single GulpDocument object with the first FieldMappingEntry object.
581 """
582 docs: list[GulpDocument] = []
583 append_doc = docs.append # local variable for faster access
584
585 common_params = {
586 "idx": idx,
587 "operation_id": operation_id,
588 "context": context,
589 "plugin": plugin,
590 "client_id": client_id,
591 "raw_event": raw_event,
592 "original_id": original_id,
593 "src_file": src_file,
594 "timestamp": timestamp,
595 "timestamp_nsec": timestamp_nsec,
596 "event_code": event_code,
597 "cat": cat,
598 "duration_nsec": duration_nsec,
599 "gulp_log_level": gulp_log_level,
600 "original_log_level": original_log_level,
601 **kwargs,
602 }
603 for f in fme:
604 # print("%s\n\n" % (f))
605 # for each is_timestamp build a gulpdocument with all the fields in fme
606 if f.is_timestamp:
607 d = GulpDocument(fme=fme, f=f, **common_params)
608 if remove_raw_event:
609 d.original_event = None
610
611 # print("%s\n\n" % (d))
612 append_doc(d)
613
614 if len(docs) == 0:
615 # create a document with the given timestamp in timestamp/timestamp_nsec (if any, either it will be set to 0/invalid)
616 d = GulpDocument(fme=fme, **common_params)
617 if remove_raw_event:
618 d.original_event = None
619 append_doc(d)
620
621 return docs
622
623 def get_unmapped_field_name(self, field: str) -> str:
624 """
625 Returns the name of the unmapped field.
626
627 Parameters:
628 - field (str): The name of the field.
629
630 Returns:
631 - str: The name of the unmapped field.
632 """
633 if not elastic_api.UNMAPPED_PREFIX:
634 return field
635
636 return f"{elastic_api.UNMAPPED_PREFIX}.{field}"
637
638 def _type_checks(self, v: any, k: str, index_type_mapping: dict) -> any:
639 """
640 check if the value should be fixed based on the index type mapping
641
642 Args:
643 v (any): The value to check.
644 k (str): The mapped field (i.e. "user.id", may also be an unmapped (i.e. "gulp.unmapped") field)
645 index_type_mapping (dict): The elasticsearch index key->type mappings.
646 """
647 if k not in index_type_mapping:
648 # _logger.debug("key %s not found in index_type_mapping" % (k))
649 return str(v)
650
651 index_type = index_type_mapping[k]
652 if index_type == "long":
653 # self.logger().debug("converting %s:%s to long" % (k, v))
654 if isinstance(v, str):
655 if v.isnumeric():
656 return int(v)
657 if v.startswith("0x"):
658 return int(v, 16)
659 return v
660
661 if index_type == "date" and isinstance(v, str) and v.lower().startswith("0x"):
662 # convert hex to int
663 return int(v, 16)
664
665 if index_type == "keyword" or index_type == "text":
666 # self.logger().debug("converting %s:%s to keyword" % (k, v))
667 return str(v)
668
669 if index_type == "ip":
670 # self.logger().debug("converting %s:%s to ip" % (k, v))
671 if "local" in v.lower():
672 return "127.0.0.1"
673 try:
674 ipaddress.ip_address(v)
675 except ValueError as ex:
676 _logger.exception(ex)
677 return None
678
679 # add more types here if needed ...
680 # self.logger().debug("returning %s:%s" % (k, v))
681 return str(v)
682
684 self,
685 plugin_params: GulpPluginParams,
686 custom_mapping: GulpMapping,
687 source_key: str,
688 v: Any,
689 index_type_mapping: dict = None,
690 ignore_custom_mapping: bool = False,
691 **kwargs,
692 ) -> list[FieldMappingEntry]:
693 """
694 map source key to a field mapping entry with "result": {mapped_key: v}
695
696 Args:
697 plugin_params (GulpPluginParams): The plugin parameters.
698 custom_mapping (GulpMapping): The custom mapping.
699 source_key (str): The key to look for(=the event record key to be mapped) in the custom_mapping dictionary
700 v (any): value to set for mapped key/s.
701 index_type_mapping (dict, optional): The elasticsearch index key->type mappings. Defaults to None.
702 ignore_custom_mapping (bool, optional): Whether to ignore custom_mapping and directly map source_key to v. Defaults to False.
703 kwargs: Additional keyword arguments.
704
705 Returns:
706 list[FieldMappingEntry]: zero or more FieldMappingEntry objects with "result" set.
707 """
708 # get mapping and option from custom_mapping
709 if index_type_mapping is None:
710 index_type_mapping = {}
711 # _logger.debug('len index type mapping=%d' % (len(index_type_mapping)))
712 mapping_dict: dict = custom_mapping.fields
713 mapping_options = (
714 custom_mapping.options
715 if custom_mapping.options is not None
716 else GulpMappingOptions()
717 )
718
719 # basic checks
720 if v == "-" or v is None:
721 return []
722
723 if isinstance(v, str):
724 v = v.strip()
725 if not v and mapping_options.ignore_blanks:
726 # not adding blank strings
727 return []
728
729 # fix value if needed, and add to extra
730 if ignore_custom_mapping or (
731 plugin_params is not None and plugin_params.ignore_mapping_ingest
732 ):
733 # direct mapping, no need to check custom_mappings
734 return [FieldMappingEntry(result={source_key: v})]
735
736 if source_key not in mapping_dict:
737 # self.logger().error('key "%s" not found in custom mapping, mapping_dict=%s!' % (source_key, muty.string.make_shorter(str(mapping_dict))))
738 # key not found in custom_mapping, check if we have to map it anyway
739 if not mapping_options.ignore_unmapped:
740 return [
741 FieldMappingEntry(
742 result={self.get_unmapped_field_name(source_key): str(v)}
743 )
744 ]
745
746 # there is a mapping defined to be processed
747 fm: FieldMappingEntry = mapping_dict[source_key]
748 map_to_list = (
749 [fm.map_to] if isinstance(fm.map_to, (str, type(None))) else fm.map_to
750 )
751
752 # in the end, this function will return a list of FieldMappingEntry objects with "result" set: these results will be used to create the GulpDocument object
753 fme_list: list[FieldMappingEntry] = []
754 for k in map_to_list:
755 # make a copy of fme without using deepcopy)
756 dest_fm = FieldMappingEntry(
757 is_timestamp=fm.is_timestamp,
758 event_code=fm.event_code,
759 do_multiply=fm.do_multiply,
760 is_timestamp_chrome=fm.is_timestamp_chrome,
761 is_variable_mapping=fm.is_variable_mapping,
762 result={},
763 )
764
765 # check if it is a number and/or a timestamp (including chrome timestamp, which is a special case)
766 is_numeric = isinstance(v, int) or str(v).isnumeric()
767 if is_numeric:
768 v = int(v)
769 # ensure chrome timestamp is properly converted to nanos
770 # _logger.debug('***** is_numeric, v=%d' % (v))
771 if fm.is_timestamp_chrome:
772 v = int(muty.time.chrome_epoch_to_nanos(v))
773 # _logger.debug('***** is_timestamp_chrome, v nsec=%d' % (v))
774
775 if fm.do_multiply is not None:
776 # apply a multipler if any (must turn v to nanoseconds)
777 # _logger.debug("***** is_numeric, multiply, v=%d" % (v))
778 v = int(v * fm.do_multiply)
779 # _logger.debug("***** is_numeric, AFTER multiply, v=%d" % (v))
780
781 elif isinstance(v, str) and fm.is_timestamp:
782 v = int(
783 muty.time.string_to_epoch_nsec(
784 v,
785 utc=mapping_options.timestamp_utc,
786 dayfirst=mapping_options.timestamp_dayfirst,
787 yearfirst=mapping_options.timestamp_yearfirst,
788 )
789 )
790 # _logger.debug('***** str and is_timestamp, v nsec=%d' % (v))
791 if fm.is_timestamp:
792 # it's a timestamp, another event will be generated
793 vv = muty.time.nanos_to_millis(v)
794 dest_fm.result["@timestamp"] = vv
795 dest_fm.result["@timestamp_nsec"] = v
796 # _logger.debug('***** timestamp nanos, v=%d' % (v))
797 # _logger.debug('***** timestamp to millis, v=%d' % (vv))
798
799 if fm.is_timestamp or fm.is_timestamp_chrome:
800 # _logger.debug('***** timestamp or timestamp_chrome, v=%d' % (v))
801 if v < 0:
802 # _logger.debug('***** adding invalid timestamp')
803 v = 0
804 GulpDocument.add_invalid_timestamp(dest_fm.result)
805 if k is not None:
806 # also add to mapped key
807 dest_fm.result[k] = v
808 else:
809 # not a timestamp, map
810 if k is None:
811 # add unmapped
812 k = self.get_unmapped_field_name(source_key)
813 else:
814 v = self._type_checks(v, k, index_type_mapping)
815 dest_fm.result[k] = v
816
817 fme_list.append(dest_fm)
818 """
819 self.logger().debug('FME LIST FOR THIS RECORD:')
820 for p in fme_list:
821 self.logger().debug(p)
822 self.logger().debug('---------------------------------')
823 """
824 return fme_list
825
827 self, docs: list[dict], flt: GulpIngestionFilter = None
828 ) -> list[dict]:
829 """
830 Builds the ingestion chunk for the websocket, filtering if needed.
831 """
832 # self.logger().debug("building ingestion chunk, flt=%s" % (flt))
833 if not docs:
834 return []
835
836 ws_docs = [
837 {
838 "_id": doc["_id"],
839 "@timestamp": doc["@timestamp"],
840 "gulp.source.file": doc["gulp.source.file"],
841 "event.duration": doc["event.duration"],
842 "gulp.context": doc["gulp.context"],
843 "gulp.log.level": doc.get("gulp.log.level", int(GulpLogLevel.INFO)),
844 "event.category": doc.get("event.category", None),
845 "event.code": doc["event.code"],
846 "gulp.event.code": doc["gulp.event.code"],
847 }
848 for doc in docs
849 if elastic_api.filter_doc_for_ingestion(
850 doc, flt, ignore_store_all_documents=True
851 )
852 == GulpEventFilterResult.ACCEPT
853 ]
854
855 return ws_docs
856
857 async def _flush_buffer(
858 self,
859 index: str,
860 fs: TmpIngestStats,
861 ws_id: str,
862 req_id: str,
863 flt: GulpIngestionFilter = None,
864 wait_for_refresh: bool = False,
865 ) -> TmpIngestStats:
866 """
867 NOTE: errors appended by this function are intended as INGESTION errors:
868 it means something wrong with the format of the event, and must be fixed ASAP if this happens.
869 ideally, function should NEVER append errors and the errors total should be the same before and after this function returns (this function may only change the skipped total, which means some duplicates were found).
870 """
871 if len(self.buffer) == 0:
872 # already flushed
873 return fs
874
875 # _logger.debug('flushing ingestion buffer, len=%d' % (len(self.buffer)))
876
877 skipped, failed, failed_ar, ingested_docs = await elastic_api.ingest_bulk(
878 self.elastic, index, self.buffer, flt=flt, wait_for_refresh=wait_for_refresh
879 )
880 # print(json.dumps(ingested_docs, indent=2))
881
882 if failed > 0:
883 if config.debug_abort_on_elasticsearch_ingestion_error():
884 raise Exception(
885 "elasticsearch ingestion errors means GulpDocument contains invalid data, review errors on collab db!"
886 )
887
888 self.buffer = []
889
890 # build ingestion chunk
891 ws_docs = self._build_ingestion_chunk_for_ws(ingested_docs, flt)
892
893 # send ingested docs to websocket
894 if len(ws_docs) > 0:
895 ws_api.shared_queue_add_data(
896 WsQueueDataType.INGESTION_CHUNK,
897 req_id,
898 {"plugin": self.name(), "events": ws_docs},
899 ws_id=ws_id,
900 )
901
902 # update stats
903 fs = fs.update(
904 failed=failed,
905 skipped=skipped,
906 ingest_errors=failed_ar,
907 )
908 return fs
909
910 async def _ingest_record(
911 self,
912 index: str,
913 doc: GulpDocument | dict,
914 fs: TmpIngestStats,
915 ws_id: str,
916 req_id: str,
917 flt: GulpIngestionFilter = None,
918 flush_enabled: bool = True,
919 **kwargs,
920 ) -> TmpIngestStats:
921 """
922 bufferize as much as ingestion_buffer_size, then flush (writes to elasticsearch)
923 """
924 ingestion_buffer_size = config.config().get("ingestion_buffer_size", 1000)
925 self.buffer.append(doc)
926 if len(self.buffer) >= ingestion_buffer_size and flush_enabled:
927 # time to flush
928 fs = await self._flush_buffer(index, fs, ws_id, req_id, flt)
929
930 return fs
931
933 self, fs: TmpIngestStats, source: str | dict, ex: Exception | str
934 ) -> TmpIngestStats:
935 """
936 whole source failed ingestion (error happened before the record parsing loop), helper to update stats
937 """
938 self.logger().exception(
939 "PARSER FAILED: source=%s, ex=%s"
940 % (muty.string.make_shorter(str(source), 260), ex)
941 )
942 fs = fs.update(ingest_errors=[ex], parser_failed=1)
943 return fs
944
946 self, fs: TmpIngestStats, entry: any, source: str | dict, ex: Exception | str
947 ) -> TmpIngestStats:
948 """
949 record failed ingestion (in the record parser loop), helper to update stats
950
951 Args:
952 fs (TmpIngestStats): The temporary ingestion statistics.
953 entry (any): The entry that failed.
954 source (str): The source of the record.
955 ex (Exception|str): The exception that caused the failure.
956 Returns:
957 TmpIngestStats: The updated temporary ingestion statistics.
958 """
959 # self.logger().exception("RECORD FAILED: source=%s, record=%s, ex=%s" % (muty.string.make_shorter(str(source),260), muty.string.make_shorter(str(entry),260), ex))
960 fs = fs.update(failed=1, ingest_errors=[ex])
961 return fs
962
964 self,
965 index: str,
966 source: str | dict,
967 req_id: str,
968 client_id: int,
969 ws_id: str,
970 fs: TmpIngestStats,
971 flt: GulpIngestionFilter = None,
972 ) -> GulpRequestStatus:
973 """
974 to be called whenever ingest() must exit: flushes the buffer and updates the ingestion stats
975 """
976 try:
977 # finally flush ingestion buffer
978 fs = await self._flush_buffer(index, fs, ws_id, req_id, flt)
979 self.logger().info(
980 "INGESTION DONE FOR source=%s,\n\tclient_id=%d (processed=%d, failed=%d, skipped=%d, errors=%d, parser_errors=%d)"
981 % (
982 muty.string.make_shorter(str(source), 260),
983 client_id,
984 fs.ev_processed,
985 fs.ev_failed,
986 fs.ev_skipped,
987 len(fs.ingest_errors),
988 fs.parser_failed,
989 )
990 )
991 except Exception as ex:
992 fs = fs.update(ingest_errors=[ex])
993 self.logger().exception(
994 "FAILED finalizing ingestion for source=%s"
995 % (muty.string.make_shorter(str(source), 260))
996 )
997
998 finally:
999 status, _ = await GulpStats.update(
1000 self.collab,
1001 req_id,
1002 ws_id,
1003 fs=fs,
1004 force=True,
1005 file_done=True,
1006 )
1007 return status
1008
1009
1010def _get_plugin_path(plugin: str, **kwargs) -> str:
1011
1012 # try plain .py first
1013 # TODO: on license manager, disable plain .py load (only encrypted pyc)
1014 # get path according to plugin type
1015 plugin_type = kwargs.get("plugin_type", GulpPluginType.INGESTION)
1016 path_plugins = config.path_plugins(plugin_type)
1017
1018 ppy = muty.file.safe_path_join(path_plugins, plugin + ".py")
1019 if not muty.file.exists(ppy):
1020 # try pyc
1021 ppyc = muty.file.safe_path_join(path_plugins, plugin + ".pyc")
1022 if not muty.file.exists(ppyc):
1023 raise ObjectNotFound(
1024 "plugin %s not found (tried %s, %s)" % (plugin, ppy, ppyc)
1025 )
1026 return ppyc
1027 return ppy
1028
1029
1031 plugin: str,
1032 collab: AsyncEngine = None,
1033 elastic: AsyncElasticsearch = None,
1034 **kwargs,
1035) -> PluginBase:
1036 """
1037 Load a plugin from a given path or from the default plugin path.
1038
1039 Args:
1040 plugin (str): The name or path of the plugin to load.
1041 req_id (str, optional): The request ID to associate with the plugin loading. Defaults to None.
1042 collab (AsyncEngine, optional): The SQLAlchemy engine instance. Defaults to None (not needed for pysigma plugins).
1043 elastic (AsyncElasticsearch, optional): The Elasticsearch client instance. Defaults to None (not needed for pysigma plugins).
1044 kwargs: additional arguments:
1045 "plugin_type" (GulpPluginType, default=GulpPluginType.INGESTION)
1046 Returns:
1047 PluginBase: The loaded plugin.
1048
1049 Raises:
1050 Exception: If the plugin could not be loaded.
1051 """
1052 _logger.debug(
1053 "load_plugin %s, collab=%s, elastic=%s ..." % (plugin, collab, elastic)
1054 )
1055
1056 m = plugin_cache_get(plugin)
1057 if m is not None:
1058 return m.Plugin(plugin, collab, elastic, **kwargs)
1059
1060 if "/" in plugin:
1061 # plugins is an absolute path
1062 path = muty.file.abspath(plugin)
1063 else:
1064 # uses "plugin_type" to load from the correct subfolder
1065 path = _get_plugin_path(plugin, **kwargs)
1066
1067 try:
1068 m: PluginBase = muty.dynload.load_dynamic_module_from_file(path)
1069 except Exception as ex:
1070 raise Exception(
1071 "load_dynamic_module_from_file() failed, could not load plugin %s !"
1072 % (path)
1073 ) from ex
1074
1075 mod: PluginBase = m.Plugin(path, collab, elastic, **kwargs)
1076 _logger.debug("loaded plugin: %s" % (mod.name()))
1077 plugin_cache_add(m, plugin)
1078
1079 return mod
1080
1081
1082async def list_plugins() -> list[dict]:
1083 """
1084 List all available plugins.
1085
1086 Returns:
1087 list[dict]: The list of available plugins.
1088 """
1089 path_plugins = config.path_plugins()
1090 files = await muty.file.list_directory_async(path_plugins, "*.py*", recursive=True)
1091 l = []
1092 for f in files:
1093 if "__init__" not in f and "__pycache__" not in f:
1094 try:
1095 p = load_plugin(f)
1096 n = {
1097 "name": p.name(),
1098 "type": str(p.type()),
1099 "desc": p.desc(),
1100 "filename": os.path.basename(p.path),
1101 "internal": p.internal(),
1102 "options": [o.to_dict() for o in p.options()],
1103 "depends_on": p.depends_on(),
1104 "tags": p.tags(),
1105 "event_type_field": p.event_type_field(),
1106 "version": p.version(),
1107 }
1108 l.append(n)
1109 unload_plugin(p)
1110 except Exception as ex:
1111 _logger.exception(ex)
1112 _logger.error("could not load plugin %s" % (f))
1113 continue
1114
1115 return l
1116
1117
1118async def get_plugin_tags(plugin: str) -> list[str]:
1119 """
1120 Get the tags for a given (ingestion) plugin.
1121
1122 Args:
1123 plugin (str): The name of the plugin to get the tags for.
1124 Returns:
1125 list[str]: The tags for the given plugin.
1126 """
1127 p = load_plugin(plugin)
1128 tags = p.tags()
1129 return tags
1130
1131
1132def unload_plugin(mod: PluginBase) -> None:
1133 """
1134 Unloads a plugin module by calling its `unload` method and deletes the module object
1135
1136 NOTE: mod is **no more valid** after this function returns.
1137
1138 Args:
1139 mod (PluginBase): The plugin module to unload.
1140 run_gc (bool): if set, garbage collector is called after unloading the module. Defaults to True.
1141
1142 Returns:
1143 None
1144 """
1145 if config.plugin_cache_enabled():
1146 return
1147
1148 if mod is not None:
1149 # delete from cache if any
1150 # plugin_cache_delete(mod)
1151
1152 _logger.debug("unloading plugin: %s" % (mod.name()))
1153 mod.cleanup()
1154 del mod
1155
1156
1158 """
1159 Clear the process's own plugin cache.
1160
1161 Returns:
1162 None
1163 """
1164 global _cache
1165 if not config.plugin_cache_enabled():
1166 return
1167
1168 _cache = {}
1169
1170
1171def plugin_cache_remove(plugin: str) -> None:
1172 """
1173 Remove a plugin from the process's own plugin cache.
1174
1175 Args:
1176 plugin (str): The name/path of the plugin to remove from the cache.
1177
1178 Returns:
1179 None
1180 """
1181 global _cache
1182 if not config.plugin_cache_enabled():
1183 return
1184
1185 if plugin in _cache:
1186 _logger.debug("removing plugin %s from cache" % (plugin))
1187
1188 # cleanup module and delete
1189 m = _cache[plugin]
1190 del _cache[plugin]
1191
1192
1193def plugin_cache_add(m: ModuleType, name: str) -> None:
1194 """
1195 Add a plugin to the process's own plugin cache.
1196
1197 Args:
1198 m (ModuleType): The plugin module to add to the cache.
1199 name (str): The name/path of the plugin.
1200
1201 Returns:
1202 None
1203 """
1204 global _cache
1205 if not config.plugin_cache_enabled():
1206 return
1207
1208 mm = _cache.get(name, None)
1209 if mm is None:
1210 _logger.debug("adding plugin %s (%s) to cache" % (name, m))
1211 _cache[name] = m
1212
1213
1214def plugin_cache_get(plugin: str) -> ModuleType:
1215 """
1216 Retrieve a plugin from the process's own plugin cache.
1217
1218 Args:
1219 plugin (str): The name/path of the plugin to retrieve.
1220
1221 Returns:
1222 ModuleType: The plugin module if found in the cache, otherwise None.
1223 """
1224 global _cache
1225 if not config.plugin_cache_enabled():
1226 return None
1227
1228 p = _cache.get(plugin, None)
1229 if p is not None:
1230 _logger.debug("found plugin %s in cache" % (plugin))
1231 return p
list[str] depends_on(self)
Definition plugin.py:135
dict run_command(self, GulpPluginParams p)
Definition plugin.py:141
ProcessingPipeline sigma_plugin_initialize(self, ProcessingPipeline pipeline=None, str mapping_file=None, str mapping_id=None, str product=None, GulpPluginParams plugin_params=None)
Definition plugin.py:203
GulpRequestStatus _finish_ingestion(self, str index, str|dict source, str req_id, int client_id, str ws_id, TmpIngestStats fs, GulpIngestionFilter flt=None)
Definition plugin.py:972
list[dict] _build_ingestion_chunk_for_ws(self, list[dict] docs, GulpIngestionFilter flt=None)
Definition plugin.py:828
TmpIngestStats _record_failed(self, TmpIngestStats fs, any entry, str|dict source, Exception|str ex)
Definition plugin.py:947
TmpIngestStats _parser_failed(self, TmpIngestStats fs, str|dict source, Exception|str ex)
Definition plugin.py:934
list[GulpDocument] _build_gulpdocuments(self, list[FieldMappingEntry] fme, int idx, int operation_id, str context, str plugin, int client_id, str raw_event, str original_id, str src_file, int timestamp=None, int timestamp_nsec=None, str event_code=None, list[str] cat=None, int duration_nsec=0, GulpLogLevel gulp_log_level=None, str original_log_level=None, bool remove_raw_event=False, **kwargs)
Definition plugin.py:575
list[GulpDocument] _call_record_to_gulp_document_funcs(self, int operation_id, int client_id, str context, str source, TmpIngestStats fs, any record, int record_idx, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, Callable record_to_gulp_document_fun=None, **kwargs)
Definition plugin.py:391
list[GulpDocument] record_to_gulp_document(self, int operation_id, int client_id, str context, str source, TmpIngestStats fs, any record, int record_idx, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, **kwargs)
Definition plugin.py:469
TmpIngestStats _flush_buffer(self, str index, TmpIngestStats fs, str ws_id, str req_id, GulpIngestionFilter flt=None, bool wait_for_refresh=False)
Definition plugin.py:865
GulpPluginType type(self)
Definition plugin.py:106
list[FieldMappingEntry] _map_source_key(self, GulpPluginParams plugin_params, GulpMapping custom_mapping, str source_key, Any v, dict index_type_mapping=None, bool ignore_custom_mapping=False, **kwargs)
Definition plugin.py:692
tuple[dict, GulpMapping] ingest_plugin_initialize(self, str index, str|dict source, bool skip_mapping=False, ProcessingPipeline pipeline=None, str mapping_file=None, str mapping_id=None, GulpPluginParams plugin_params=None)
Definition plugin.py:325
None cleanup(self)
Definition plugin.py:549
GulpRequestStatus ingest(self, str index, str req_id, int client_id, int operation_id, str context, str|list[dict] source, str ws_id, GulpPluginParams plugin_params=None, GulpIngestionFilter flt=None, **kwargs)
Definition plugin.py:508
tuple[TmpIngestStats, bool] _process_record(self, str index, any record, int record_idx, Callable my_record_to_gulp_document_fun, str ws_id, str req_id, int operation_id, int client_id, str context, str source, TmpIngestStats fs, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, GulpIngestionFilter flt=None, **kwargs)
Definition plugin.py:259
tuple[GulpMapping, GulpPluginParams] _process_plugin_params(self, GulpMapping custom_mapping, GulpPluginParams plugin_params=None)
Definition plugin.py:168
str get_unmapped_field_name(self, str field)
Definition plugin.py:623
bool internal(self)
Definition plugin.py:147
ProcessingPipeline pipeline(self, GulpPluginParams plugin_params=None, **kwargs)
Definition plugin.py:537
list[str] tags(self)
Definition plugin.py:153
any _type_checks(self, any v, str k, dict index_type_mapping)
Definition plugin.py:638
str event_type_field(self)
Definition plugin.py:129
None __init__(self, str path, AsyncEngine collab=None, AsyncElasticsearch elastic=None, **kwargs)
Definition plugin.py:66
TmpIngestStats _ingest_record(self, str index, GulpDocument|dict doc, TmpIngestStats fs, str ws_id, str req_id, GulpIngestionFilter flt=None, bool flush_enabled=True, **kwargs)
Definition plugin.py:920
logging.Logger logger(cls)
Definition plugin.py:95
list[GulpPluginOption] options(self)
Definition plugin.py:99
list[str] get_plugin_tags(str plugin)
Definition plugin.py:1118
init(logging.Logger logger)
Definition plugin.py:47
None unload_plugin(PluginBase mod)
Definition plugin.py:1132
None plugin_cache_add(ModuleType m, str name)
Definition plugin.py:1193
None plugin_cache_remove(str plugin)
Definition plugin.py:1171
list[dict] list_plugins()
Definition plugin.py:1082
None plugin_cache_clear()
Definition plugin.py:1157
ModuleType plugin_cache_get(str plugin)
Definition plugin.py:1214
PluginBase load_plugin(str plugin, AsyncEngine collab=None, AsyncElasticsearch elastic=None, **kwargs)
Definition plugin.py:1035
str _get_plugin_path(str plugin, **kwargs)
Definition plugin.py:1010