(g)ULP!
Loading...
Searching...
No Matches
Plugin Class Reference
Inheritance diagram for Plugin:
Collaboration diagram for Plugin:

Public Member Functions

str desc (self)
 
GulpRequestStatus ingest (self, str index, str req_id, int client_id, int operation_id, str context, str|list[dict] source, str ws_id, GulpPluginParams plugin_params=None, GulpIngestionFilter flt=None, **kwargs)
 
str name (self)
 
list[GulpPluginOptionoptions (self)
 
list[GulpDocument] record_to_gulp_document (self, int operation_id, int client_id, str context, str source, TmpIngestStats fs, any record, int record_idx, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, **kwargs)
 
GulpPluginType type (self)
 
str version (self)
 
- Public Member Functions inherited from PluginBase
None __init__ (self, str path, AsyncEngine collab=None, AsyncElasticsearch elastic=None, **kwargs)
 
None cleanup (self)
 
list[str] depends_on (self)
 
str event_type_field (self)
 
str get_unmapped_field_name (self, str field)
 
tuple[dict, GulpMapping] ingest_plugin_initialize (self, str index, str|dict source, bool skip_mapping=False, ProcessingPipeline pipeline=None, str mapping_file=None, str mapping_id=None, GulpPluginParams plugin_params=None)
 
bool internal (self)
 
logging.Logger logger (cls)
 
ProcessingPipeline pipeline (self, GulpPluginParams plugin_params=None, **kwargs)
 
dict run_command (self, GulpPluginParams p)
 
ProcessingPipeline sigma_plugin_initialize (self, ProcessingPipeline pipeline=None, str mapping_file=None, str mapping_id=None, str product=None, GulpPluginParams plugin_params=None)
 
list[str] tags (self)
 

Additional Inherited Members

- Public Attributes inherited from PluginBase
list buffer = []
 
str client_id = None
 
 collab = collab
 
tuple context = (None,)
 
 elastic = elastic
 
str index = None
 
str operation_id = None
 
 path = path
 
str req_id = None
 
str ws_id = None
 
- Protected Member Functions inherited from PluginBase
list[GulpDocument] _build_gulpdocuments (self, list[FieldMappingEntry] fme, int idx, int operation_id, str context, str plugin, int client_id, str raw_event, str original_id, str src_file, int timestamp=None, int timestamp_nsec=None, str event_code=None, list[str] cat=None, int duration_nsec=0, GulpLogLevel gulp_log_level=None, str original_log_level=None, bool remove_raw_event=False, **kwargs)
 
list[dict] _build_ingestion_chunk_for_ws (self, list[dict] docs, GulpIngestionFilter flt=None)
 
list[GulpDocument] _call_record_to_gulp_document_funcs (self, int operation_id, int client_id, str context, str source, TmpIngestStats fs, any record, int record_idx, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, Callable record_to_gulp_document_fun=None, **kwargs)
 
GulpRequestStatus _finish_ingestion (self, str index, str|dict source, str req_id, int client_id, str ws_id, TmpIngestStats fs, GulpIngestionFilter flt=None)
 
TmpIngestStats _flush_buffer (self, str index, TmpIngestStats fs, str ws_id, str req_id, GulpIngestionFilter flt=None, bool wait_for_refresh=False)
 
TmpIngestStats _ingest_record (self, str index, GulpDocument|dict doc, TmpIngestStats fs, str ws_id, str req_id, GulpIngestionFilter flt=None, bool flush_enabled=True, **kwargs)
 
list[FieldMappingEntry] _map_source_key (self, GulpPluginParams plugin_params, GulpMapping custom_mapping, str source_key, Any v, dict index_type_mapping=None, bool ignore_custom_mapping=False, **kwargs)
 
TmpIngestStats _parser_failed (self, TmpIngestStats fs, str|dict source, Exception|str ex)
 
tuple[GulpMapping, GulpPluginParams_process_plugin_params (self, GulpMapping custom_mapping, GulpPluginParams plugin_params=None)
 
tuple[TmpIngestStats, bool] _process_record (self, str index, any record, int record_idx, Callable my_record_to_gulp_document_fun, str ws_id, str req_id, int operation_id, int client_id, str context, str source, TmpIngestStats fs, GulpMapping custom_mapping=None, dict index_type_mapping=None, str plugin=None, GulpPluginParams plugin_params=None, GulpIngestionFilter flt=None, **kwargs)
 
TmpIngestStats _record_failed (self, TmpIngestStats fs, any entry, str|dict source, Exception|str ex)
 
any _type_checks (self, any v, str k, dict index_type_mapping)
 

Detailed Description

CSV generic file processor

the csv plugin may ingest any CSV file itself, but it is also used as a base plugin for other plugins (in "stacked" mode).

### standalone mode

when used by itself, it is sufficient to ingest a CSV file with the default settings (no extra parameters needed).

NOTE: since each document stored on elasticsearch must have a "@timestamp", either a mapping file is provided, or "timestamp_field" is set to a field name in the CSV file.

~~~bash
# all CSV field will result in "gulp.unmapped.*" fields, timestamp will be set from "UpdateTimestamp" field
TEST_PLUGIN_PARAMS='{"timestamp_field": "UpdateTimestamp"}' TEST_PLUGIN=csv ./test_scripts/test_ingest.sh -p ./samples/mftecmd/sample_j.csv

# use a mapping file
# a mapping file may hold more than one mapping definition with its own options (as defined in helpers.get_mapping_from_file())
TEST_PLUGIN_PARAMS='{"mapping_file": "mftecmd_csv.json", "mapping_id": "j"}' TEST_PLUGIN=csv ./test_scripts/test_ingest.sh -p ./samples/mftecmd/sample_j.csv
~~~

### stacked mode

in stacked mode, we simply run the stacked plugin, which in turn use the CSV plugin to parse the data.

~~~bash
TEST_PLUGIN=stacked_example ./test_scripts/test_ingest.sh -p ./samples/mftecmd/sample_j.csv
~~~

see the example in [stacked_example.py](stacked_example.py)

### parameters

CSV plugin support the following custom parameters in the plugin_params.extra dictionary:

- `delimiter`: set the delimiter for the CSV file (default=",")

~~~

Definition at line 25 of file csv.py.

Member Function Documentation

◆ desc()

str desc ( self)
Returns a description of the plugin.

Reimplemented from PluginBase.

Definition at line 68 of file csv.py.

68 def desc(self) -> str:
69 return """generic CSV file processor"""
70
Here is the caller graph for this function:

◆ ingest()

GulpRequestStatus ingest ( self,
str index,
str req_id,
int client_id,
int operation_id,
str context,
str | list[dict] source,
str ws_id,
GulpPluginParams plugin_params = None,
GulpIngestionFilter flt = None,
** kwargs )
Ingests a file using the plugin.

NOTE: implementers should call super().ingest() in their implementation.
NOTE: this function *SHOULD NOT* raise exceptions

Args:
    index (str): name of the elasticsearch index to ingest the document into.
    req_id (str): The request ID related to this ingestion (must exist on the collab db).
    client_id (int): The client ID performing the ingestion.
    operation_id (int): The operation ID related to this ingestion.
    context (str): Context related to this ingestion.
    source (str|list[dict]): The path to the file to ingest, or a list of events dicts.
    ws_id (str): The websocket ID
    plugin_params (GulpPluginParams): additional parameters to pass to the ingestion function. Defaults to None.
    flt (GulpIngestionFilter, optional): filter to apply to this ingestion, if any. Defaults to None.
    kwargs: additional arguments if any

Reimplemented from PluginBase.

Definition at line 142 of file csv.py.

154 ) -> GulpRequestStatus:
155
156 await super().ingest(
157 index=index,
158 req_id=req_id,
159 client_id=client_id,
160 operation_id=operation_id,
161 context=context,
162 source=source,
163 ws_id=ws_id,
164 plugin_params=plugin_params,
165 flt=flt,
166 **kwargs,
167 )
168
169 fs = TmpIngestStats(source)
170
171 # initialize mapping
172 index_type_mapping, custom_mapping = await self.ingest_plugin_initialize(
173 index, source, plugin_params=plugin_params
174 )
175
176 # check plugin_params
177 try:
178 custom_mapping, plugin_params = self._process_plugin_params(
179 custom_mapping, plugin_params
180 )
181 except InvalidArgument as ex:
182 fs = self._parser_failed(fs, source, ex)
183 return await self._finish_ingestion(index, source, req_id, client_id, ws_id, fs=fs, flt=flt)
184
185 self.logger().debug("custom_mapping=%s" % (custom_mapping))
186
187 delimiter = plugin_params.extra.get("delimiter", ",")
188
189 if custom_mapping.options.agent_type is None:
190 plugin = self.name()
191 else:
192 plugin = custom_mapping.options.agent_type
193 Plugin.logger().warning("using plugin name=%s" % (plugin))
194
195 ev_idx = 0
196 try:
197 async with aiofiles.open(
198 source, mode="r", encoding="utf-8", newline=""
199 ) as f:
200 async for line_dict in AsyncDictReader(f, delimiter=delimiter):
201 # rebuild original line and fix dict (remove BOM, if present)
202 line = ""
203 fixed_dict = {}
204 for k, v in line_dict.items():
205 if v is not None and len(v) > 0:
206 k = muty.string.remove_unicode_bom(k)
207 fixed_dict[k] = v
208
209 if v is None:
210 v = ""
211
212 line += v + delimiter
213
214 # add original line as __line__
215 line = line[:-1]
216 fixed_dict["__line__"] = line
217
218 # convert record to gulp document
219 try:
220 fs, must_break = await self._process_record(index, fixed_dict, ev_idx,
221 self.record_to_gulp_document,
222 ws_id, req_id, operation_id, client_id,
223 context, source, fs,
224 custom_mapping=custom_mapping,
225 index_type_mapping=index_type_mapping,
226 plugin=self.name(),
227 plugin_params=plugin_params,
228 flt=flt,
229 **kwargs)
230 ev_idx += 1
231 if must_break:
232 break
233
234 except Exception as ex:
235 fs = self._record_failed(fs, fixed_dict, source, ex)
236
237 except Exception as ex:
238 # add an error
239 fs = self._parser_failed(fs, source, ex)
240
241 # done
242 return await self._finish_ingestion(index, source, req_id, client_id, ws_id, fs, flt)

◆ name()

str name ( self)
Returns the name of the plugin.

Reimplemented from PluginBase.

Definition at line 78 of file csv.py.

78 def name(self) -> str:
79 return "csv"
80
Here is the caller graph for this function:

◆ options()

list[GulpPluginOption] options ( self)
return available GulpPluginOption list (plugin specific parameters)

Reimplemented from PluginBase.

Definition at line 71 of file csv.py.

71 def options(self) -> list[GulpPluginOption]:
72 return [
73 GulpPluginOption(
74 "delimiter", "str", "delimiter for the CSV file", default=","
75 )
76 ]
77
Here is the caller graph for this function:

◆ record_to_gulp_document()

list[GulpDocument] record_to_gulp_document ( self,
int operation_id,
int client_id,
str context,
str source,
TmpIngestStats fs,
any record,
int record_idx,
GulpMapping custom_mapping = None,
dict index_type_mapping = None,
str plugin = None,
GulpPluginParams plugin_params = None,
** kwargs )
Converts a record to one or more GulpDocument objects based on the provided index mappings.

Args:
    operation_id (int): The operation ID associated with the record.
    client_id (int): The client ID associated with the record.
    context (str): The context associated with the record.
    source (str): The source of the record (source file name or path, usually).
    fs (TmpIngestStats): The temporary ingestion statistics (may be updated on return).
    record (any): record to convert, plugin dependent format: note that here stacked plugins receives a list of GulpDocument objects instead (since the original record may generate one or more documents).
    record_idx (int): The index of the record in source.
    custom_mapping (GulpMapping, optional): The custom mapping to use for the conversion. Defaults to None.
    index_type_mapping (dict, optional): elastic search index type mappings { "ecs_field": "type", ... }. Defaults to None.
    plugin (str, optional): "agent.type" to be set in the GulpDocument. Defaults to None.
    plugin_params (GulpPluginParams, optional): The plugin parameters to use, if any. Defaults to None.
    extra (dict, optional): Additional fields to add to the GulpDocument (after applying mapping). Defaults to None.
    **kwargs: Additional keyword arguments:

Returns:
    list[GulDocument]: The converted GulpDocument objects or None if an exception occurred (fs is updated then).

Raises:
    NotImplementedError: This method is not implemented yet.

Reimplemented from PluginBase.

Definition at line 84 of file csv.py.

98 ) -> list[GulpDocument]:
99
100 # Plugin.logger().debug("record: %s" % record)
101 event: dict = record
102
103 # get raw csv line (then remove it)
104 raw_text: str = event["__line__"]
105 del event["__line__"]
106
107 # map all keys for this record
108 fme: list[FieldMappingEntry] = []
109 for k, v in event.items():
110 e = self._map_source_key(
111 plugin_params,
112 custom_mapping,
113 k,
114 v,
115 index_type_mapping=index_type_mapping,
116 **kwargs,
117 )
118 fme.extend(e)
119
120 # self.logger().debug("processed extra=%s" % (json.dumps(extra, indent=2)))
121
122 # this is the global event code for this mapping, but it may be overridden
123 # at field level, anyway
124 event_code = (
125 custom_mapping.options.default_event_code if custom_mapping is not None else None
126 )
127 #self.logger().error(f"**** SET FROM PLUGIN ev_code: {event_code}, custom_mapping: {custom_mapping}")
128 events = self._build_gulpdocuments(
129 fme,
130 idx=record_idx,
131 operation_id=operation_id,
132 context=context,
133 plugin=plugin,
134 client_id=client_id,
135 raw_event=raw_text,
136 event_code=event_code,
137 original_id=record_idx,
138 src_file=os.path.basename(source),
139 )
140 return events
141
Here is the caller graph for this function:

◆ type()

GulpPluginType type ( self)
Returns the plugin type.

Reimplemented from PluginBase.

Definition at line 65 of file csv.py.

65 def type(self) -> GulpPluginType:
66 return GulpPluginType.INGESTION
67

◆ version()

str version ( self)
Returns plugin version.

Reimplemented from PluginBase.

Definition at line 81 of file csv.py.

81 def version(self) -> str:
82 return "1.0"
83

The documentation for this class was generated from the following file: