(g)ULP!
Loading...
Searching...
No Matches
models.py
Go to the documentation of this file.
1import json
2from typing import Any, Optional
3
4from pydantic import BaseModel, Field, model_validator
5
6
7class GulpMappingOptions(BaseModel):
8 """
9 GulpMappingOptions defines global options for a mapping (an array of FieldMappingEntry).
10 """
11
12 agent_type: Optional[str] = Field(
13 None,
14 description='if set, forces events fot this mapping to have "agent.type" set to this value.',
15 )
16
17 default_event_code: Optional[str] = Field(
18 None,
19 description='if set, this is the "event_code" to be set for this mapping: this may be overridden by "event_code" in FieldMappingEntry when "is_timestamp": true forces generation of a new event with such "@timestamp" and, possibly, "event.code".',
20 )
21 ignore_unmapped: Optional[bool] = Field(
22 False,
23 description='if True, ignore(=do not set in the output document) unmapped fields. Default is False, if a field does not have a corresponding mapping entry it will be mapped as "gulp.unmapped.fieldname").',
24 )
25 mapping_id: Optional[str] = Field(
26 None,
27 description="mapping identifier, i.e. to select this mapping via GulpPluginParams.",
28 )
29 ignore_blanks: Optional[bool] = Field(
30 True,
31 description="if True, remove blank (string) fields from the event (default: True).",
32 )
33 timestamp_yearfirst: Optional[bool] = Field(
34 True,
35 description="to convert timestamp string, indicating if the year comes first in the timestamp string (default: True).",
36 )
37 timestamp_dayfirst: Optional[bool] = Field(
38 False,
39 description="to convert timestamp string, indicating if the day comes first in the timestamp string (default: False).",
40 )
41 timestamp_utc: Optional[bool] = Field(
42 True,
43 description="to convert timestamp string, indicating if the converted timestamp should be UTC (default: True).",
44 )
45
46 model_config = {
47 "json_schema_extra": {
48 "example": {
49 "agent_type": "my_agent_type",
50 "default_event_code": "my_event_code",
51 "mapping_id": "my_mapping_id",
52 "purge_blanks": True,
53 "timestamp_yearfirst": True,
54 "timestamp_dayfirst": False,
55 "timestamp_utc": True,
56 }
57 }
58 }
59
60 def to_dict(self) -> dict:
61 d = {
62 "agent_type": self.agent_type,
63 "mapping_id": self.mapping_id,
64 "default_event_code": self.default_event_code,
65 "purge_blanks": self.ignore_blanks,
66 "ignore_unmapped": self.ignore_unmapped,
67 "timestamp_yearfirst": self.timestamp_yearfirst,
68 "timestamp_dayfirst": self.timestamp_dayfirst,
69 "timestamp_utc": self.timestamp_utc
70 }
71 return d
72
73 @staticmethod
74 def from_dict(d: dict) -> "GulpMappingOptions":
75 return GulpMappingOptions(
76 agent_type=d.get("agent_type", None),
77 default_event_code=d.get("default_event_code", None),
78 mapping_id=d.get("mapping_id", None),
79 ignore_blanks=d.get("purge_blanks", True),
80 ignore_unmapped=d.get("ignore_unmapped", False),
81 timestamp_yearfirst=d.get("timestamp_yearfirst", True),
82 timestamp_dayfirst=d.get("timestamp_dayfirst", False),
83 timestamp_utc=d.get("timestamp_utc", True),
84 )
85
86
87class FieldMappingEntry(BaseModel):
88 """
89 FieldMappingEntry defines how to map a single field, including field-specific options.
90 """
91
92 map_to: Optional[str | list[str] | list[list[str]]] = Field(
93 None,
94 description="""str for single mapping (field: mapped_field),<br>
95 list[str] for multiple mapping (field: [mapped_field1, mapped_field2],<br>
96 list[list[str]] for variable mapping.
97 <br><br>
98 a variable mapping is defined as a list of arrays, where each array is a condition to match.
99 i.e. field: [["service", "security", "user.name"]] means
100 "field" must be mapped as "user.name" when logsource field="service" and its name="security".
101 <br><br>
102 map_to may also not be set: in such case, the field is mapped as gulp.unmapped.fieldname.
103 """,
104 )
105 is_timestamp: Optional[bool] = Field(
106 False,
107 description='if True, this field refer to a timestamp and will generate "@timestamp" and "@timestamp_nsec" fields.<br>'
108 'NOTE: if more than one "is_timestamp" is set in the mapping, multiple events with such "@timestamp" and, possibly, "event.code" are generated.',
109 )
110 is_timestamp_chrome: Optional[bool] = Field(
111 False,
112 description='if set and if value is a number, or string representing a number, it is interpreted as a chrome/webkit timestamp (with epoch 01/01/1601) and converted to **nanoseconds from unix epoch**.<br>'
113 'NOTE: ignored if "do_multiply" is set, if set together with "is_timestamp" will generate converted chrome "@timestamp" and "@timestamp_nsec".'
114 )
115 do_multiply: Optional[float] = Field(
116 None,
117 description='if set and if value is a number, or string representing a number, multiply it by this value (to divide, i.e. multiply by 0.5 to divide by 2).<br>'
118 'NOTE: ignored if "is_timestamp_chrome" is set, result of the multiply/divide must be **nanoseconds**.'
119 )
120 event_code: Optional[str] = Field(
121 None,
122 description='if set, overrides "default_event_code" in the GulpMappingOptions (ignored if "is_timestamp" is not set).'
123 )
124 is_variable_mapping: Optional[bool] = Field(
125 False,
126 description='INTERNAL USAGE ONLY. if True, indicates the "map_to" field is a variable mapping.',
127 )
128 result: Optional[dict[str, Any]] = Field(
129 None, description='INTERNAL USAGE ONLY. the destination dict representing the field (i.e. {"@timestamp": 123456}).'
130 )
131
132 def to_dict(self) -> dict:
133 d = {
134 "map_to": self.map_to,
135 "is_timestamp": self.is_timestamp,
136 "event_code": self.event_code,
137 "do_multiply": self.do_multiply,
138 "is_timestamp_chrome": self.is_timestamp_chrome,
139 }
140 return d
141
142 @staticmethod
143 def from_dict(d: dict) -> "FieldMappingEntry":
144 # print('FieldMappingEntry.from_dict: d=%s, type=%s' % (d, type(d)))
146 map_to=d.get("map_to", None),
147 event_code=d.get("event_code", None),
148 do_multiply=d.get("do_multiply", None),
149 is_variable_mapping=d.get("is_variable_mapping", False),
150 is_timestamp=d.get("is_timestamp", False),
151 is_timestamp_chrome=d.get("is_timestamp_chrome", False)
152 )
153 # print('FieldMappingEntry.from_dict: fm=%s, type=%s' % (fm, type(fm)))
154 return fm
155
156 model_config = {
157 "json_schema_extra": {
158 "example": {
159 "map_to": "my_field",
160 "event_code": "my_event_code",
161 "is_timestamp": False,
162 }
163 }
164 }
165
166
167class GulpMapping(BaseModel):
168 """
169 defines a source->elasticsearch document mapping for a Gulp plugin.
170 """
171
172 fields: dict[str, FieldMappingEntry] = Field(
173 {},
174 description='a dictionary where each key is a source field to be mapped to "map_to" defined in "FieldMappingEntry".',
175 )
176 options: Optional[GulpMappingOptions] = Field(
177 None,
178 description="specific options for this GulpMapping. if None, defaults are applied.",
179 )
180
181 model_config = {
182 "json_schema_extra": {
183 "example": {
184 "fields": {
185 "source_field1_single": {
186 "map_to": "my_field"
187 },
188 "source_field2_multiple": {
189 "map_to": ["my_field1", "myfield2"]
190 },
191 "source_field3_variable": {
192 "map_to": [
193 ["category", "process_creation", "process.pe.company"],
194 ["category", "image_load", "file.pe.company"],
195 ]
196 },
197 },
198 "options": {
199 "agent_type": "my_agent_type",
200 "event_code": "my_event_code",
201 "mapping_id": "my_mapping_id",
202 "ignore_unmapped": False,
203 "timestamp_yearfirst": True,
204 "timestamp_dayfirst": False,
205 "timestamp_tz": "UTC",
206 "extra": {"my_custom_option": "my_custom_value"},
207 },
208 }
209 }
210 }
211
212 def to_dict(self) -> dict:
213 """
214 NOTE: options is guaranteed to be not None (default values are applied if None)
215 """
216 d = {
217 "fields": {k: v.to_dict() for k, v in self.fields.items()},
218 "options": (
219 GulpMappingOptions.to_dict(self.options)
220 if self.options is not None
222 ),
223 }
224 return d
225
226 @staticmethod
227 def from_dict(d: dict) -> "GulpMapping":
228 m = d.get("fields", {})
229 if len(m) > 0:
230 fields = {k: FieldMappingEntry.from_dict(v) for k, v in m.items()}
231 else:
232 fields = {}
233 opts = d.get("options", None)
234 if opts is not None:
235 options = GulpMappingOptions.from_dict(opts)
236 else:
237 # default values
238 options = GulpMappingOptions()
239
240 return GulpMapping(
241 fields=fields,
242 options=options,
243 )
244
245 @model_validator(mode="before")
246 @classmethod
247 def to_py_dict(cls, data: str | dict):
248 if data is None:
249 return {}
250
251 if isinstance(data, dict):
252 return data
253
254 return json.loads(data)
Definition models.py:87
dict to_dict(self)
Definition models.py:132
Optional event_code
Definition models.py:120
Optional is_timestamp
Definition models.py:105
Optional do_multiply
Definition models.py:115
"FieldMappingEntry" from_dict(dict d)
Definition models.py:143
Optional map_to
Definition models.py:92
Optional is_timestamp_chrome
Definition models.py:110
to_py_dict(cls, str|dict data)
Definition models.py:247
"GulpMapping" from_dict(dict d)
Definition models.py:227
"GulpMappingOptions" from_dict(dict d)
Definition models.py:74