(g)ULP!
Loading...
Searching...
No Matches
csv.py
Go to the documentation of this file.
1import os
2from copy import deepcopy
3
4import aiofiles
5import muty.dict
6import muty.os
7import muty.string
8import muty.xml
9
10from gulp.api.collab.base import GulpRequestStatus
11from gulp.api.collab.stats import GulpStats, TmpIngestStats
12from gulp.api.elastic.structs import GulpDocument, GulpIngestionFilter
13from gulp.api.mapping.models import FieldMappingEntry, GulpMapping
14from gulp.defs import GulpPluginType, InvalidArgument
15from gulp.plugin import PluginBase
16from gulp.plugin_internal import GulpPluginOption, GulpPluginParams
17
18try:
19 from aiocsv import AsyncDictReader
20except Exception:
21 muty.os.install_package("aiocsv")
22 from aiocsv import AsyncDictReader
23
24
26 """
27 CSV generic file processor
28
29 the csv plugin may ingest any CSV file itself, but it is also used as a base plugin for other plugins (in "stacked" mode).
30
31 ### standalone mode
32
33 when used by itself, it is sufficient to ingest a CSV file with the default settings (no extra parameters needed).
34
35 NOTE: since each document stored on elasticsearch must have a "@timestamp", either a mapping file is provided, or "timestamp_field" is set to a field name in the CSV file.
36
37 ~~~bash
38 # all CSV field will result in "gulp.unmapped.*" fields, timestamp will be set from "UpdateTimestamp" field
39 TEST_PLUGIN_PARAMS='{"timestamp_field": "UpdateTimestamp"}' TEST_PLUGIN=csv ./test_scripts/test_ingest.sh -p ./samples/mftecmd/sample_j.csv
40
41 # use a mapping file
42 # a mapping file may hold more than one mapping definition with its own options (as defined in helpers.get_mapping_from_file())
43 TEST_PLUGIN_PARAMS='{"mapping_file": "mftecmd_csv.json", "mapping_id": "j"}' TEST_PLUGIN=csv ./test_scripts/test_ingest.sh -p ./samples/mftecmd/sample_j.csv
44 ~~~
45
46 ### stacked mode
47
48 in stacked mode, we simply run the stacked plugin, which in turn use the CSV plugin to parse the data.
49
50 ~~~bash
51 TEST_PLUGIN=stacked_example ./test_scripts/test_ingest.sh -p ./samples/mftecmd/sample_j.csv
52 ~~~
53
54 see the example in [stacked_example.py](stacked_example.py)
55
56 ### parameters
57
58 CSV plugin support the following custom parameters in the plugin_params.extra dictionary:
59
60 - `delimiter`: set the delimiter for the CSV file (default=",")
61
62 ~~~
63 """
64
65 def type(self) -> GulpPluginType:
66 return GulpPluginType.INGESTION
67
68 def desc(self) -> str:
69 return """generic CSV file processor"""
70
71 def options(self) -> list[GulpPluginOption]:
72 return [
73 GulpPluginOption(
74 "delimiter", "str", "delimiter for the CSV file", default=","
75 )
76 ]
77
78 def name(self) -> str:
79 return "csv"
80
81 def version(self) -> str:
82 return "1.0"
83
84 async def record_to_gulp_document(
85 self,
86 operation_id: int,
87 client_id: int,
88 context: str,
89 source: str,
90 fs: TmpIngestStats,
91 record: any,
92 record_idx: int,
93 custom_mapping: GulpMapping = None,
94 index_type_mapping: dict = None,
95 plugin: str = None,
96 plugin_params: GulpPluginParams = None,
97 **kwargs,
98 ) -> list[GulpDocument]:
99
100 # Plugin.logger().debug("record: %s" % record)
101 event: dict = record
102
103 # get raw csv line (then remove it)
104 raw_text: str = event["__line__"]
105 del event["__line__"]
106
107 # map all keys for this record
108 fme: list[FieldMappingEntry] = []
109 for k, v in event.items():
110 e = self._map_source_key(
111 plugin_params,
112 custom_mapping,
113 k,
114 v,
115 index_type_mapping=index_type_mapping,
116 **kwargs,
117 )
118 fme.extend(e)
119
120 # self.logger().debug("processed extra=%s" % (json.dumps(extra, indent=2)))
121
122 # this is the global event code for this mapping, but it may be overridden
123 # at field level, anyway
124 event_code = (
125 custom_mapping.options.default_event_code if custom_mapping is not None else None
126 )
127 #self.logger().error(f"**** SET FROM PLUGIN ev_code: {event_code}, custom_mapping: {custom_mapping}")
128 events = self._build_gulpdocuments(
129 fme,
130 idx=record_idx,
131 operation_id=operation_id,
132 context=context,
133 plugin=plugin,
134 client_id=client_id,
135 raw_event=raw_text,
136 event_code=event_code,
137 original_id=record_idx,
138 src_file=os.path.basename(source),
139 )
140 return events
141
142 async def ingest(
143 self,
144 index: str,
145 req_id: str,
146 client_id: int,
147 operation_id: int,
148 context: str,
149 source: str | list[dict],
150 ws_id: str,
151 plugin_params: GulpPluginParams = None,
152 flt: GulpIngestionFilter = None,
153 **kwargs,
154 ) -> GulpRequestStatus:
155
156 await super().ingest(
157 index=index,
158 req_id=req_id,
159 client_id=client_id,
160 operation_id=operation_id,
161 context=context,
162 source=source,
163 ws_id=ws_id,
164 plugin_params=plugin_params,
165 flt=flt,
166 **kwargs,
167 )
168
169 fs = TmpIngestStats(source)
170
171 # initialize mapping
172 index_type_mapping, custom_mapping = await self.ingest_plugin_initialize(
173 index, source, plugin_params=plugin_params
174 )
175
176 # check plugin_params
177 try:
178 custom_mapping, plugin_params = self._process_plugin_params(
179 custom_mapping, plugin_params
180 )
181 except InvalidArgument as ex:
182 fs = self._parser_failed(fs, source, ex)
183 return await self._finish_ingestion(index, source, req_id, client_id, ws_id, fs=fs, flt=flt)
184
185 self.logger().debug("custom_mapping=%s" % (custom_mapping))
186
187 delimiter = plugin_params.extra.get("delimiter", ",")
188
189 if custom_mapping.options.agent_type is None:
190 plugin = self.name()
191 else:
192 plugin = custom_mapping.options.agent_type
193 Plugin.logger().warning("using plugin name=%s" % (plugin))
194
195 ev_idx = 0
196 try:
197 async with aiofiles.open(
198 source, mode="r", encoding="utf-8", newline=""
199 ) as f:
200 async for line_dict in AsyncDictReader(f, delimiter=delimiter):
201 # rebuild original line and fix dict (remove BOM, if present)
202 line = ""
203 fixed_dict = {}
204 for k, v in line_dict.items():
205 if v is not None and len(v) > 0:
206 k = muty.string.remove_unicode_bom(k)
207 fixed_dict[k] = v
208
209 if v is None:
210 v = ""
211
212 line += v + delimiter
213
214 # add original line as __line__
215 line = line[:-1]
216 fixed_dict["__line__"] = line
217
218 # convert record to gulp document
219 try:
220 fs, must_break = await self._process_record(index, fixed_dict, ev_idx,
221 self.record_to_gulp_document,
222 ws_id, req_id, operation_id, client_id,
223 context, source, fs,
224 custom_mapping=custom_mapping,
225 index_type_mapping=index_type_mapping,
226 plugin=self.name(),
227 plugin_params=plugin_params,
228 flt=flt,
229 **kwargs)
230 ev_idx += 1
231 if must_break:
232 break
233
234 except Exception as ex:
235 fs = self._record_failed(fs, fixed_dict, source, ex)
236
237 except Exception as ex:
238 # add an error
239 fs = self._parser_failed(fs, source, ex)
240
241 # done
242 return await self._finish_ingestion(index, source, req_id, client_id, ws_id, fs, flt)
GulpPluginType type(self)
Definition csv.py:65
str desc(self)
Definition csv.py:68