(g)ULP!
Loading...
Searching...
No Matches
plugin_internal.py
Go to the documentation of this file.
1import json
2from typing import Any, Optional
3
4from pydantic import BaseModel, Field, SkipValidation, model_validator
5
6
7class GulpPluginParams(BaseModel):
8 """
9 parameters for a plugin, to be passed to ingest and query API
10 """
11
12 mapping_file: Optional[str] = Field(
13 None,
14 description='mapping file name (in gulp/mapping_files directory) to read "mappings" array from, if any.',
15 )
16
17 mapping_id: Optional[str] = Field(
18 None,
19 description="mapping identifier, i.e. to select this mapping via GulpMappingOptions.",
20 )
21
22 config_override: Optional[dict[str, Any]] = Field(
23 {}, description="allow to override gulp configuration parameters."
24 )
25 ignore_mapping_ingest: Optional[bool] = Field(
26 False,
27 description="ignore mapping when ingesting (to be compatible with OpenSearch Security Analytics).",
28 )
29 ignore_mapping_sigma_query: Optional[bool] = Field(
30 False,
31 description="ignore mapping when querying using Sigma rules.",
32 )
33 timestamp_field: Optional[str] = Field(
34 None,
35 description="The timestamp field (for, i.e. use the a generic plugin without any mapping)",
36 )
37 record_to_gulp_document_fun: SkipValidation[Any] = Field(
38 [],
39 description="INTERNAL USAGE ONLY, to get mapping from (for stacked plugins).",
40 )
41 pipeline: SkipValidation[Any] = Field(
42 None,
43 description="INTERNAL USAGE ONLY, the sigma ProcessingPipeline to get mapping from.",
44 )
45 extra: Optional[dict[str, Any]] = Field({}, description="any extra custom options, i.e. the ones listed in plugin.options().")
46
47 model_config = {
48 "json_schema_extra": {
49 "example": {
50 "mapping_file": "my_mapping.json",
51 "mapping_id": "my_mapping_id",
52 "config_override": {"parallel_processes_respawn_after_tasks": 500},
53 "extra": {"my_custom_option": "my_custom_value"},
54 }
55 }
56 }
57
58 def to_dict(self) -> dict:
59 d = {
60 "mapping_file": self.mapping_file,
61 "mapping_id": self.mapping_id,
62 "config_override": self.config_override,
63 "ignore_mapping_ingest": self.ignore_mapping_ingest,
64 "ignore_mapping_sigma_query": self.ignore_mapping_sigma_query,
65 "extra": self.extra,
66 "timestamp_field": self.timestamp_field,
67 "record_to_gulp_document_fun": self.record_to_gulp_document_fun,
68 "pipeline": self.pipeline,
69 }
70 return d
71
72 @staticmethod
73 def from_dict(d: dict) -> "GulpPluginParams":
74 return GulpPluginParams(
75 mapping_file=d.get("mapping_file", None),
76 mapping_id=d.get("mapping_id", None),
77 timestamp_field=d.get("timestamp_field", None),
78 ignore_mapping_ingest=d.get("ignore_mapping_ingest", False),
79 ignore_mapping_sigma_query=d.get("ignore_mapping_sigma_query", False),
80 config_override=d.get("config_override", {}),
81 extra=d.get("extra", {}),
82 record_to_gulp_document_fun=d.get("record_to_gulp_document_fun", []),
83 pipeline=d.get("pipeline", None),
84 )
85
86 @model_validator(mode="before")
87 @classmethod
88 def to_py_dict(cls, data: str | dict):
89 if data is None:
90 return {}
91
92 if isinstance(data, dict):
93 return data
94 return json.loads(data)
95
96
98 """
99 defines plugin specific supported options, passed through GulpPluginParams.extra
100 """
101 def __init__(self, name: str, t: str, desc: str, default: any=None):
102 self.name = name
103 self.t = t
104 self.default = default
105 self.desc = desc
106
107 def to_dict(self) -> dict:
108 return {
109 "name": self.name,
110 "type": self.t,
111 "default": self.default,
112 "desc": self.desc
113 }
__init__(self, str name, str t, str desc, any default=None)
"GulpPluginParams" from_dict(dict d)