(g)ULP!
Loading...
Searching...
No Matches
plugin_internal.py
Go to the documentation of this file.
1import json
2from typing import Any, Optional
3
4from pydantic import BaseModel, Field, SkipValidation, model_validator
5
6
7class GulpPluginParams(BaseModel):
8 """
9 parameters for a plugin, to be passed to ingest and query API
10 """
11
12 mapping_file: Optional[str] = Field(
13 None,
14 description='mapping file name (in gulp/mapping_files directory) to read "mappings" array from, if any.',
15 )
16
17 mapping_id: Optional[str] = Field(
18 None,
19 description="mapping identifier, i.e. to select this mapping via GulpMappingOptions.",
20 )
21
22 config_override: Optional[dict[str, Any]] = Field(
23 {}, description="allow to override gulp configuration parameters."
24 )
25 ignore_mapping_ingest: Optional[bool] = Field(
26 False,
27 description="ignore mapping when ingesting (to be compatible with OpenSearch Security Analytics).",
28 )
29 ignore_mapping_sigma_query: Optional[bool] = Field(
30 False,
31 description="ignore mapping when querying using Sigma rules.",
32 )
33 timestamp_field: Optional[str] = Field(
34 None,
35 description="The timestamp field (for, i.e. use the a generic plugin without any mapping)",
36 )
37 record_to_gulp_document_fun: SkipValidation[Any] = Field(
38 [],
39 description="INTERNAL USAGE ONLY, to get mapping from (for stacked plugins).",
40 )
41 pipeline: SkipValidation[Any] = Field(
42 None,
43 description="INTERNAL USAGE ONLY, the sigma ProcessingPipeline to get mapping from.",
44 )
45 extra: Optional[dict[str, Any]] = Field(
46 {},
47 description="any extra custom options, i.e. the ones listed in plugin.options().",
48 )
49
50 model_config = {
51 "json_schema_extra": {
52 "example": {
53 "mapping_file": "my_mapping.json",
54 "mapping_id": "my_mapping_id",
55 "config_override": {"parallel_processes_respawn_after_tasks": 500},
56 "extra": {"my_custom_option": "my_custom_value"},
57 }
58 }
59 }
60
61 def to_dict(self) -> dict:
62 d = {
63 "mapping_file": self.mapping_file,
64 "mapping_id": self.mapping_id,
65 "config_override": self.config_override,
66 "ignore_mapping_ingest": self.ignore_mapping_ingest,
67 "ignore_mapping_sigma_query": self.ignore_mapping_sigma_query,
68 "extra": self.extra,
69 "timestamp_field": self.timestamp_field,
70 "record_to_gulp_document_fun": self.record_to_gulp_document_fun,
71 "pipeline": self.pipeline,
72 }
73 return d
74
75 @staticmethod
76 def from_dict(d: dict) -> "GulpPluginParams":
77 return GulpPluginParams(
78 mapping_file=d.get("mapping_file", None),
79 mapping_id=d.get("mapping_id", None),
80 timestamp_field=d.get("timestamp_field", None),
81 ignore_mapping_ingest=d.get("ignore_mapping_ingest", False),
82 ignore_mapping_sigma_query=d.get("ignore_mapping_sigma_query", False),
83 config_override=d.get("config_override", {}),
84 extra=d.get("extra", {}),
85 record_to_gulp_document_fun=d.get("record_to_gulp_document_fun", []),
86 pipeline=d.get("pipeline", None),
87 )
88
89 @model_validator(mode="before")
90 @classmethod
91 def to_py_dict(cls, data: str | dict):
92 if data is None or len(data) == 0:
93 return {}
94
95 if isinstance(data, dict):
96 return data
97 return json.loads(data)
98
99
101 """
102 this is used by the UI through the plugin.options() method to list the supported options, and their types, for a plugin.
103 """
104
105 def __init__(self, name: str, t: str, desc: str, default: any = None):
106 """
107 :param name: option name
108 :param t: option type (use "bool", "str", "int", "float", "dict", "list" for the types.)
109 :param desc: option description
110 :param default: default value
111 """
112 self.name = name
113 self.t = t
114 self.default = default
115 self.desc = desc
116
117 def to_dict(self) -> dict:
118 return {
119 "name": self.name,
120 "type": self.t,
121 "default": self.default,
122 "desc": self.desc,
123 }
__init__(self, str name, str t, str desc, any default=None)
"GulpPluginParams" from_dict(dict d)