Skip to content

Commit b284e73

Browse files
authored
feat: add logtail pipeline config (#344)
* feat: add logtail pipeline config support * fix warning * bump version 0.9.35 * add sample
1 parent 72117a8 commit b284e73

File tree

7 files changed

+693
-5
lines changed

7 files changed

+693
-5
lines changed

aliyun/log/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from .listtopicsrequest import ListTopicsRequest
88
from .listlogstoresrequest import ListLogstoresRequest
99
from .logtail_config_detail import *
10+
from .logtail_pipeline_config_detail import *
11+
from .logtail_pipeline_config_response import *
1012
from .machine_group_detail import MachineGroupDetail
1113
from .putlogsrequest import PutLogsRequest
1214
from .shipper_config import ShipperTask, OssShipperConfig, OdpsShipperConfig

aliyun/log/etl_core/trans_comp/trans_kv.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@ class kv_transformer(trans_comp_check_mdoe_base):
3232
@staticmethod
3333
@cached
3434
def _get_kv_ptn(sep, quote, escape):
35-
p1 = u'(?!{0})([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)\\s*{0}\\s*([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)'
35+
# Use Unicode characters directly to avoid escape sequence warnings
36+
# \u4e00-\u9fa5 = Chinese characters, \u0800-\u4e00 = other Unicode range
37+
p1 = r'(?!{0})([' + '\u4e00-\u9fa5\u0800-\u4e00' + r'\w\.\-]+)\s*{0}\s*([' + '\u4e00-\u9fa5\u0800-\u4e00' + r'\w\.\-]+)'
3638
if not escape:
37-
p2 = u'(?!{0})([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)\\s*{0}\\s*{1}\s*([^{1}]*?)\s*{1}'
39+
p2 = r'(?!{0})([' + '\u4e00-\u9fa5\u0800-\u4e00' + r'\w\.\-]+)\s*{0}\s*{1}\s*([^{1}]*?)\s*{1}'
3840
else:
39-
p2 = u'(?!{0})([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)\\s*{0}\\s*{1}\s*((?:[^{1}]|\\\\{1})*?[^\\\\]){1}'
40-
ps = u'|'.join([p1, p2]).format(sep, quote)
41+
p2 = r'(?!{0})([' + '\u4e00-\u9fa5\u0800-\u4e00' + r'\w\.\-]+)\s*{0}\s*{1}\s*((?:[^{1}]|\\{1})*?[^\\]){1}'
42+
ps = '|'.join([p1, p2]).format(sep, quote)
4143

4244
logger.info(u"trans_comp_kv: get ptn: {0}".format(ps))
4345
return re.compile(ps)

aliyun/log/logclient.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from .logstore_config_response import *
4242
from .substore_config_response import *
4343
from .logtail_config_response import *
44+
from .logtail_pipeline_config_response import *
4445
from .machinegroup_response import *
4546
from .rebuild_index_response import *
4647
from .project_response import *
@@ -2328,6 +2329,132 @@ def list_logtail_config(self, project_name, logstore=None, config=None, offset=0
23282329
(resp, header) = self._send("GET", project_name, None, resource, params, headers)
23292330
return ListLogtailConfigResponse(resp, header)
23302331

2332+
def create_logtail_pipeline_config(self, project_name, config_detail):
2333+
""" create logtail pipeline config in a project
2334+
Unsuccessful operation will cause an LogException.
2335+
2336+
:type project_name: string
2337+
:param project_name: the Project name
2338+
2339+
:type config_detail: LogtailPipelineConfigDetail
2340+
:param config_detail: the logtail pipeline config detail
2341+
2342+
:return: CreateLogtailPipelineConfigResponse
2343+
2344+
:raise: LogException
2345+
"""
2346+
headers = {}
2347+
params = {}
2348+
resource = "/pipelineconfigs"
2349+
headers['Content-Type'] = 'application/json'
2350+
body = six.b(json.dumps(config_detail.to_json()))
2351+
headers['x-log-bodyrawsize'] = str(len(body))
2352+
(resp, headers) = self._send("POST", project_name, body, resource, params, headers)
2353+
return CreateLogtailPipelineConfigResponse(headers, resp)
2354+
2355+
def update_logtail_pipeline_config(self, project_name, config_detail):
2356+
""" update logtail pipeline config in a project
2357+
Unsuccessful operation will cause an LogException.
2358+
2359+
:type project_name: string
2360+
:param project_name: the Project name
2361+
2362+
:type config_detail: LogtailPipelineConfigDetail
2363+
:param config_detail: the logtail pipeline config detail
2364+
2365+
:return: UpdateLogtailPipelineConfigResponse
2366+
2367+
:raise: LogException
2368+
"""
2369+
headers = {}
2370+
params = {}
2371+
resource = "/pipelineconfigs/" + config_detail.config_name
2372+
headers['Content-Type'] = 'application/json'
2373+
body = six.b(json.dumps(config_detail.to_json()))
2374+
headers['x-log-bodyrawsize'] = str(len(body))
2375+
(resp, headers) = self._send("PUT", project_name, body, resource, params, headers)
2376+
return UpdateLogtailPipelineConfigResponse(headers, resp)
2377+
2378+
def delete_logtail_pipeline_config(self, project_name, config_name):
2379+
""" delete logtail pipeline config in a project
2380+
Unsuccessful operation will cause an LogException.
2381+
2382+
:type project_name: string
2383+
:param project_name: the Project name
2384+
2385+
:type config_name: string
2386+
:param config_name: the logtail pipeline config name
2387+
2388+
:return: DeleteLogtailPipelineConfigResponse
2389+
2390+
:raise: LogException
2391+
"""
2392+
headers = {}
2393+
params = {}
2394+
resource = "/pipelineconfigs/" + config_name
2395+
(resp, headers) = self._send("DELETE", project_name, None, resource, params, headers)
2396+
return DeleteLogtailPipelineConfigResponse(headers, resp)
2397+
2398+
def get_logtail_pipeline_config(self, project_name, config_name):
2399+
""" get logtail pipeline config in a project
2400+
Unsuccessful operation will cause an LogException.
2401+
2402+
:type project_name: string
2403+
:param project_name: the Project name
2404+
2405+
:type config_name: string
2406+
:param config_name: the logtail pipeline config name
2407+
2408+
:return: GetLogtailPipelineConfigResponse
2409+
2410+
:raise: LogException
2411+
"""
2412+
headers = {}
2413+
params = {}
2414+
resource = "/pipelineconfigs/" + config_name
2415+
(resp, headers) = self._send("GET", project_name, None, resource, params, headers)
2416+
return GetLogtailPipelineConfigResponse(resp, headers)
2417+
2418+
def list_logtail_pipeline_config(self, project_name, config_name=None, logstore_name=None, offset=0, size=100):
2419+
""" list logtail pipeline config names in a project
2420+
Unsuccessful operation will cause an LogException.
2421+
2422+
:type project_name: string
2423+
:param project_name: the Project name
2424+
2425+
:type config_name: string
2426+
:param config_name: config name to filter
2427+
2428+
:type logstore_name: string
2429+
:param logstore_name: logstore name to filter
2430+
2431+
:type offset: int
2432+
:param offset: the offset of all config names
2433+
2434+
:type size: int
2435+
:param size: the max return names count, -1 means all
2436+
2437+
:return: ListLogtailPipelineConfigResponse
2438+
2439+
:raise: LogException
2440+
"""
2441+
# need to use extended method to get more
2442+
if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE:
2443+
return list_more(self.list_logtail_pipeline_config, int(offset), int(size), MAX_LIST_PAGING_SIZE,
2444+
project_name, config_name, logstore_name)
2445+
2446+
headers = {}
2447+
params = {}
2448+
resource = "/pipelineconfigs"
2449+
params['offset'] = str(offset)
2450+
params['size'] = str(size)
2451+
if config_name:
2452+
params['configName'] = config_name
2453+
if logstore_name:
2454+
params['logstoreName'] = logstore_name
2455+
(resp, header) = self._send("GET", project_name, None, resource, params, headers)
2456+
return ListLogtailPipelineConfigResponse(resp, header)
2457+
23312458
def create_machine_group(self, project_name, group_detail):
23322459
""" create machine group in a project
23332460
Unsuccessful operation will cause an LogException.
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#!/usr/bin/env python
2+
# encoding: utf-8
3+
4+
# Copyright (C) Alibaba Cloud Computing
5+
# All rights reserved.
6+
7+
"""
8+
Logtail Pipeline Config Detail Classes
9+
10+
This module provides classes for managing Logtail pipeline configurations.
11+
"""
12+
13+
from __future__ import print_function
14+
import json
15+
16+
__all__ = ['LogtailPipelineConfigDetail']
17+
18+
19+
class LogtailPipelineConfigDetail(object):
20+
"""Logtail Pipeline Config Detail
21+
22+
:type config_name: string
23+
:param config_name: config name, can only contain lowercase letters, numbers, - and _,
24+
must start and end with lowercase letter or number, length between 2-128 characters
25+
26+
:type log_sample: string
27+
:param log_sample: log sample, supports multiple log entries
28+
29+
:type global_config: dict
30+
:param global_config: global configuration
31+
32+
:type inputs: list
33+
:param inputs: input plugin list, currently only allows 1 input plugin
34+
35+
:type processors: list
36+
:param processors: processor plugin list
37+
38+
:type aggregators: list
39+
:param aggregators: aggregator plugin list, maximum 1 aggregator plugin allowed
40+
41+
:type flushers: list
42+
:param flushers: flusher plugin list, currently only allows 1 flusher_sls plugin
43+
"""
44+
45+
def __init__(self, config_name, inputs, flushers, log_sample=None,
46+
global_config=None, processors=None, aggregators=None):
47+
"""Initialize a Logtail Pipeline Config Detail
48+
49+
:type config_name: string
50+
:param config_name: config name
51+
52+
:type inputs: list
53+
:param inputs: input plugin list
54+
55+
:type flushers: list
56+
:param flushers: flusher plugin list
57+
58+
:type log_sample: string
59+
:param log_sample: log sample (optional)
60+
61+
:type global_config: dict
62+
:param global_config: global configuration (optional)
63+
64+
:type processors: list
65+
:param processors: processor plugin list (optional)
66+
67+
:type aggregators: list
68+
:param aggregators: aggregator plugin list (optional)
69+
"""
70+
self.config_name = config_name
71+
self.log_sample = log_sample
72+
self.global_config = global_config if global_config is not None else {}
73+
self.inputs = inputs if inputs is not None else []
74+
self.processors = processors if processors is not None else []
75+
self.aggregators = aggregators if aggregators is not None else []
76+
self.flushers = flushers if flushers is not None else []
77+
78+
def to_json(self):
79+
"""Convert to JSON object
80+
81+
:return: dict object ready for JSON serialization
82+
"""
83+
json_value = {
84+
"configName": self.config_name
85+
}
86+
87+
# Add optional fields
88+
if self.log_sample:
89+
json_value["logSample"] = self.log_sample
90+
91+
if self.global_config:
92+
json_value["global"] = self.global_config
93+
94+
# Required fields
95+
json_value["inputs"] = self.inputs
96+
json_value["flushers"] = self.flushers
97+
98+
# Optional fields
99+
if self.processors:
100+
json_value["processors"] = self.processors
101+
102+
if self.aggregators:
103+
json_value["aggregators"] = self.aggregators
104+
105+
return json_value
106+
107+
@staticmethod
108+
def from_json(json_value):
109+
"""Create LogtailPipelineConfigDetail from JSON object
110+
111+
:type json_value: dict
112+
:param json_value: JSON object
113+
114+
:return: LogtailPipelineConfigDetail object
115+
"""
116+
config_name = json_value.get("configName", "")
117+
log_sample = json_value.get("logSample", None)
118+
global_config = json_value.get("global", None)
119+
inputs = json_value.get("inputs", [])
120+
processors = json_value.get("processors", None)
121+
aggregators = json_value.get("aggregators", None)
122+
flushers = json_value.get("flushers", [])
123+
124+
return LogtailPipelineConfigDetail(
125+
config_name=config_name,
126+
inputs=inputs,
127+
flushers=flushers,
128+
log_sample=log_sample,
129+
global_config=global_config,
130+
processors=processors,
131+
aggregators=aggregators
132+
)
133+
134+
135+

0 commit comments

Comments
 (0)