(g)ULP!
Loading...
Searching...
No Matches
models.py
Go to the documentation of this file.
1import json
2from typing import Any, Optional
3
4from pydantic import BaseModel, Field, model_validator
5
6
7class GulpMappingOptions(BaseModel):
8 """
9 GulpMappingOptions defines global options for a mapping (an array of FieldMappingEntry).
10 """
11
12 agent_type: Optional[str] = Field(
13 None,
14 description='if set, forces events fot this mapping to have "agent.type" set to this value.',
15 )
16
17 default_event_code: Optional[str] = Field(
18 None,
19 description='if set, this is the "event_code" to be set for this mapping: this may be overridden by "event_code" in FieldMappingEntry when "is_timestamp": true forces generation of a new event with such "@timestamp" and, possibly, "event.code".',
20 )
21 ignore_unmapped: Optional[bool] = Field(
22 False,
23 description='if True, ignore(=do not set in the output document) unmapped fields. Default is False, if a field does not have a corresponding mapping entry it will be mapped as "gulp.unmapped.fieldname").',
24 )
25 mapping_id: Optional[str] = Field(
26 None,
27 description="mapping identifier, i.e. to select this mapping via GulpPluginParams.",
28 )
29 ignore_blanks: Optional[bool] = Field(
30 True,
31 description="if True, remove blank (string) fields from the event (default: True).",
32 )
33 timestamp_yearfirst: Optional[bool] = Field(
34 True,
35 description="to convert timestamp string, indicating if the year comes first in the timestamp string (default: True).",
36 )
37 timestamp_dayfirst: Optional[bool] = Field(
38 False,
39 description="to convert timestamp string, indicating if the day comes first in the timestamp string (default: False).",
40 )
41 timestamp_utc: Optional[bool] = Field(
42 True,
43 description="to convert timestamp string, indicating if the converted timestamp should be UTC (default: True).",
44 )
45
46 model_config = {
47 "json_schema_extra": {
48 "example": {
49 "agent_type": "my_agent_type",
50 "default_event_code": "my_event_code",
51 "mapping_id": "my_mapping_id",
52 "purge_blanks": True,
53 "timestamp_yearfirst": True,
54 "timestamp_dayfirst": False,
55 "timestamp_utc": True,
56 }
57 }
58 }
59
60 def to_dict(self) -> dict:
61 d = {
62 "agent_type": self.agent_type,
63 "mapping_id": self.mapping_id,
64 "default_event_code": self.default_event_code,
65 "purge_blanks": self.ignore_blanks,
66 "ignore_unmapped": self.ignore_unmapped,
67 "timestamp_yearfirst": self.timestamp_yearfirst,
68 "timestamp_dayfirst": self.timestamp_dayfirst,
69 "timestamp_utc": self.timestamp_utc,
70 }
71 return d
72
73 @staticmethod
74 def from_dict(d: dict) -> "GulpMappingOptions":
75 return GulpMappingOptions(
76 agent_type=d.get("agent_type", None),
77 default_event_code=d.get("default_event_code", None),
78 mapping_id=d.get("mapping_id", None),
79 ignore_blanks=d.get("purge_blanks", True),
80 ignore_unmapped=d.get("ignore_unmapped", False),
81 timestamp_yearfirst=d.get("timestamp_yearfirst", True),
82 timestamp_dayfirst=d.get("timestamp_dayfirst", False),
83 timestamp_utc=d.get("timestamp_utc", True),
84 )
85
86
87class FieldMappingEntry(BaseModel):
88 """
89 FieldMappingEntry defines how to map a single field, including field-specific options.
90 """
91
92 map_to: Optional[str | list[str] | list[list[str]]] = Field(
93 None,
94 description="""str for single mapping (field: mapped_field),<br>
95 list[str] for multiple mapping (field: [mapped_field1, mapped_field2],<br>
96 list[list[str]] for variable mapping.
97 <br><br>
98 a variable mapping is defined as a list of arrays, where each array is a condition to match.
99 i.e. field: [["service", "security", "user.name"]] means
100 "field" must be mapped as "user.name" when logsource field="service" and its name="security".
101 <br><br>
102 map_to may also not be set: in such case, the field is mapped as gulp.unmapped.fieldname.
103 """,
104 )
105 is_timestamp: Optional[bool] = Field(
106 False,
107 description='if True, this field refer to a timestamp and will generate "@timestamp" and "@timestamp_nsec" fields.<br>'
108 'NOTE: if more than one "is_timestamp" is set in the mapping, multiple events with such "@timestamp" and, possibly, "event.code" are generated.',
109 )
110 is_timestamp_chrome: Optional[bool] = Field(
111 False,
112 description="if set and if value is a number, or string representing a number, it is interpreted as a chrome/webkit timestamp (with epoch 01/01/1601) and converted to **nanoseconds from unix epoch**.<br>"
113 'NOTE: ignored if "do_multiply" is set, if set together with "is_timestamp" will generate converted chrome "@timestamp" and "@timestamp_nsec".',
114 )
115 do_multiply: Optional[float] = Field(
116 None,
117 description="if set and if value is a number, or string representing a number, multiply it by this value (to divide, i.e. multiply by 0.5 to divide by 2).<br>"
118 'NOTE: ignored if "is_timestamp_chrome" is set, result of the multiply/divide must be **nanoseconds**.',
119 )
120 event_code: Optional[str] = Field(
121 None,
122 description='if set, overrides "default_event_code" in the GulpMappingOptions (ignored if "is_timestamp" is not set).',
123 )
124 is_variable_mapping: Optional[bool] = Field(
125 False,
126 description='INTERNAL USAGE ONLY. if True, indicates the "map_to" field is a variable mapping.',
127 )
128 result: Optional[dict[str, Any]] = Field(
129 None,
130 description='INTERNAL USAGE ONLY. the destination dict representing the field (i.e. {"@timestamp": 123456}).',
131 )
132
133 def to_dict(self) -> dict:
134 d = {
135 "map_to": self.map_to,
136 "is_timestamp": self.is_timestamp,
137 "event_code": self.event_code,
138 "do_multiply": self.do_multiply,
139 "is_timestamp_chrome": self.is_timestamp_chrome,
140 }
141 return d
142
143 @staticmethod
144 def from_dict(d: dict) -> "FieldMappingEntry":
145 # print('FieldMappingEntry.from_dict: d=%s, type=%s' % (d, type(d)))
147 map_to=d.get("map_to", None),
148 event_code=d.get("event_code", None),
149 do_multiply=d.get("do_multiply", None),
150 is_variable_mapping=d.get("is_variable_mapping", False),
151 is_timestamp=d.get("is_timestamp", False),
152 is_timestamp_chrome=d.get("is_timestamp_chrome", False),
153 )
154 # print('FieldMappingEntry.from_dict: fm=%s, type=%s' % (fm, type(fm)))
155 return fm
156
157 model_config = {
158 "json_schema_extra": {
159 "example": {
160 "map_to": "my_field",
161 "event_code": "my_event_code",
162 "is_timestamp": False,
163 }
164 }
165 }
166
167
168class GulpMapping(BaseModel):
169 """
170 defines a source->elasticsearch document mapping for a Gulp plugin.
171 """
172
173 fields: dict[str, FieldMappingEntry] = Field(
174 {},
175 description='a dictionary where each key is a source field to be mapped to "map_to" defined in "FieldMappingEntry".',
176 )
177 options: Optional[GulpMappingOptions] = Field(
178 None,
179 description="specific options for this GulpMapping. if None, defaults are applied.",
180 )
181
182 model_config = {
183 "json_schema_extra": {
184 "example": {
185 "fields": {
186 "source_field1_single": {"map_to": "my_field"},
187 "source_field2_multiple": {"map_to": ["my_field1", "myfield2"]},
188 "source_field3_variable": {
189 "map_to": [
190 ["category", "process_creation", "process.pe.company"],
191 ["category", "image_load", "file.pe.company"],
192 ]
193 },
194 },
195 "options": {
196 "agent_type": "my_agent_type",
197 "event_code": "my_event_code",
198 "mapping_id": "my_mapping_id",
199 "ignore_unmapped": False,
200 "timestamp_yearfirst": True,
201 "timestamp_dayfirst": False,
202 "timestamp_tz": "UTC",
203 "extra": {"my_custom_option": "my_custom_value"},
204 },
205 }
206 }
207 }
208
209 def to_dict(self) -> dict:
210 """
211 NOTE: options is guaranteed to be not None (default values are applied if None)
212 """
213 d = {
214 "fields": {k: v.to_dict() for k, v in self.fields.items()},
215 "options": (
216 GulpMappingOptions.to_dict(self.options)
217 if self.options is not None
219 ),
220 }
221 return d
222
223 @staticmethod
224 def from_dict(d: dict) -> "GulpMapping":
225 m = d.get("fields", {})
226 if len(m) > 0:
227 fields = {k: FieldMappingEntry.from_dict(v) for k, v in m.items()}
228 else:
229 fields = {}
230 opts = d.get("options", None)
231 if opts is not None:
232 options = GulpMappingOptions.from_dict(opts)
233 else:
234 # default values
235 options = GulpMappingOptions()
236
237 return GulpMapping(
238 fields=fields,
239 options=options,
240 )
241
242 @model_validator(mode="before")
243 @classmethod
244 def to_py_dict(cls, data: str | dict):
245 if data is None or len(data) == 0:
246 return {}
247
248 if isinstance(data, dict):
249 return data
250
251 return json.loads(data)
Definition models.py:87
dict to_dict(self)
Definition models.py:133
Optional event_code
Definition models.py:120
Optional is_timestamp
Definition models.py:105
Optional do_multiply
Definition models.py:115
"FieldMappingEntry" from_dict(dict d)
Definition models.py:144
Optional map_to
Definition models.py:92
Optional is_timestamp_chrome
Definition models.py:110
to_py_dict(cls, str|dict data)
Definition models.py:244
"GulpMapping" from_dict(dict d)
Definition models.py:224
"GulpMappingOptions" from_dict(dict d)
Definition models.py:74