Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions aliyun/log/async_sql.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
syntax = "proto2";

message AsyncSqlMetaPB
{
optional int32 result_rows = 1;
optional int64 processed_rows = 2;
optional int64 processed_bytes = 3;
optional int64 elapsed_milli = 4;
optional double cpu_sec = 5;
optional int64 cpu_cores = 6;
optional string progress = 7;
repeated string keys = 8; // column names
}

message AsyncSqlRowPB
{
repeated string columns = 1;
}

message AsyncSqlResponsePB
{
required string id = 1;
required string state = 2;
optional AsyncSqlMetaPB meta = 3;
repeated AsyncSqlRowPB rows = 4;
optional string error_code = 5;
optional string error_message = 6;
}
29 changes: 29 additions & 0 deletions aliyun/log/async_sql_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

165 changes: 165 additions & 0 deletions aliyun/log/async_sql_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#!/usr/bin/env python
# encoding: utf-8

# Copyright (C) Alibaba Cloud Computing
# All rights reserved.

from .logresponse import LogResponse
from .async_sql_pb2 import AsyncSqlResponsePB
from .compress import Compressor


class AsyncSqlResponse(LogResponse):
""" The response used for async SQL operations.

:type header: dict
:param header: response header

:type resp: string
:param resp: response data
"""

def __init__(self, header, resp):
LogResponse.__init__(self, header, resp)
self.async_sql_response_pb = AsyncSqlResponsePB()
if resp:
try:
raw_data = Compressor.decompress_response(header, resp)
self.async_sql_response_pb.ParseFromString(raw_data)
except Exception as ex:
raise Exception("Failed to parse AsyncSqlResponsePB: {0}".format(ex))

def get_query_id(self):
""" Get async SQL query ID

:return: string, query ID
"""
return self.async_sql_response_pb.id

def get_state(self):
""" Get async SQL query state

:return: string, query state (e.g., "RUNNING", "COMPLETE", "FAILED")
"""
return self.async_sql_response_pb.state

def get_error_code(self):
""" Get error code if query failed

:return: string, error code
"""
return self.async_sql_response_pb.error_code if self.async_sql_response_pb.HasField('error_code') else None

def get_error_message(self):
""" Get error message if query failed

:return: string, error message
"""
return self.async_sql_response_pb.error_message if self.async_sql_response_pb.HasField('error_message') else None

def get_meta(self):
""" Get query metadata

:return: AsyncSqlMetaPB, query metadata
"""
return self.async_sql_response_pb.meta if self.async_sql_response_pb.HasField('meta') else None

def get_result_rows(self):
""" Get number of result rows

:return: int, number of result rows
"""
meta = self.get_meta()
return meta.result_rows if meta and meta.HasField('result_rows') else 0

def get_processed_rows(self):
""" Get number of processed rows

:return: int, number of processed rows
"""
meta = self.get_meta()
return meta.processed_rows if meta and meta.HasField('processed_rows') else 0

def get_processed_bytes(self):
""" Get number of processed bytes

:return: int, number of processed bytes
"""
meta = self.get_meta()
return meta.processed_bytes if meta and meta.HasField('processed_bytes') else 0

def get_elapsed_milli(self):
""" Get elapsed time in milliseconds

:return: int, elapsed time in milliseconds
"""
meta = self.get_meta()
return meta.elapsed_milli if meta and meta.HasField('elapsed_milli') else 0

def get_cpu_sec(self):
""" Get CPU time in seconds

:return: float, CPU time in seconds
"""
meta = self.get_meta()
return meta.cpu_sec if meta and meta.HasField('cpu_sec') else 0.0

def get_cpu_cores(self):
""" Get CPU cores used

:return: int, CPU cores used
"""
meta = self.get_meta()
return meta.cpu_cores if meta and meta.HasField('cpu_cores') else 0

def get_progress(self):
""" Get query progress

:return: string, query progress
"""
meta = self.get_meta()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个可以优化下,用一个 common func,比如 get_or_default 这样更清晰一点

return meta.progress if meta and meta.HasField('progress') else ''

def get_keys(self):
""" Get column names/keys

:return: list, column names
"""
meta = self.get_meta()
return list(meta.keys) if meta else []

def get_rows(self):
""" Get result rows

:return: list, result rows data
"""
rows = []
for row_pb in self.async_sql_response_pb.rows:
rows.append(list(row_pb.columns))
return rows

def get_raw_response_pb(self):
""" Get raw protobuf response

:return: AsyncSqlResponsePB, raw protobuf response
"""
return self.async_sql_response_pb

def log_print(self):
""" Print response information for debugging
"""
print("AsyncSqlResponse:")
print(" Query ID: {0}".format(self.get_query_id()))
print(" State: {0}".format(self.get_state()))
print(" Result Rows: {0}".format(self.get_result_rows()))
print(" Processed Rows: {0}".format(self.get_processed_rows()))
print(" Processed Bytes: {0}".format(self.get_processed_bytes()))
print(" Elapsed Time: {0}ms".format(self.get_elapsed_milli()))
print(" CPU Time: {0}s".format(self.get_cpu_sec()))
print(" CPU Cores: {0}".format(self.get_cpu_cores()))
print(" Progress: {0}".format(self.get_progress()))
if self.get_error_code():
print(" Error Code: {0}".format(self.get_error_code()))
print(" Error Message: {0}".format(self.get_error_message()))
print(" Column Names: {0}".format(self.get_keys()))
print(" Data Rows: {0}".format(len(self.get_rows())))
37 changes: 37 additions & 0 deletions aliyun/log/delete_async_sql_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python
# encoding: utf-8

# Copyright (C) Alibaba Cloud Computing
# All rights reserved.

from .logrequest import LogRequest


class DeleteAsyncSqlRequest(LogRequest):
""" The request used to delete async SQL query.

:type project: string
:param project: project name

:type query_id: string
:param query_id: async SQL query ID
"""

def __init__(self, project=None, query_id=None):
LogRequest.__init__(self, project)
self.query_id = query_id

def get_query_id(self):
""" Get query ID

:return: string, query ID
"""
return self.query_id if self.query_id else ''

def set_query_id(self, query_id):
""" Set query ID

:type query_id: string
:param query_id: query ID
"""
self.query_id = query_id
75 changes: 75 additions & 0 deletions aliyun/log/get_async_sql_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python
# encoding: utf-8

# Copyright (C) Alibaba Cloud Computing
# All rights reserved.

from .logrequest import LogRequest


class GetAsyncSqlRequest(LogRequest):
""" The request used to get async SQL query results.

:type project: string
:param project: project name

:type query_id: string
:param query_id: async SQL query ID

:type offset: int
:param offset: line offset of return logs

:type line: int
:param line: max line number of return logs
"""

def __init__(self, project=None, query_id=None, offset=0, line=100):
LogRequest.__init__(self, project)
self.query_id = query_id
self.offset = offset
self.line = line

def get_query_id(self):
""" Get query ID

:return: string, query ID
"""
return self.query_id if self.query_id else ''

def set_query_id(self, query_id):
""" Set query ID

:type query_id: string
:param query_id: query ID
"""
self.query_id = query_id

def get_offset(self):
""" Get line offset of return logs

:return: int, line offset of return logs
"""
return self.offset

def set_offset(self, offset):
""" Set line offset of return logs

:type offset: int
:param offset: line offset of return logs
"""
self.offset = offset

def get_line(self):
""" Get max line number of return logs

:return: int, max line number of return logs
"""
return self.line

def set_line(self, line):
""" Set max line number of return logs

:type line: int
:param line: max line number of return logs
"""
self.line = line
Loading
Loading