diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index 404c92a..2783015 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -17,7 +17,6 @@ #include "CCommon.h" #include "CMessage.h" #include "CMessageExt.h" -#include "CBatchMessage.h" #include "CSendResult.h" #include "CProducer.h" #include "CPushConsumer.h" @@ -33,6 +32,8 @@ const char *VERSION = "PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " "; map> g_CallBackMap; +map g_TransactionCheckCallBackMap; + class PyThreadStateLock { public: @@ -96,18 +97,6 @@ int PySetMessageDelayTimeLevel(void *msg, int level) { return SetDelayTimeLevel((CMessage *) msg, level); } -//batch message -void *PyCreateBatchMessage() { - return (void *) CreateBatchMessage(); -} - -int PyAddMessage(void *batchMsg, void *msg) { - return AddMessage((CBatchMessage *) batchMsg, (CMessage *) msg); -} - -int PyDestroyBatchMessage(void *batchMsg) { - return DestroyBatchMessage((CBatchMessage *) batchMsg); -} //messageExt const char *PyGetMessageTopic(PyMessageExt msgExt) { @@ -134,6 +123,27 @@ void *PyCreateProducer(const char *groupId) { PyEval_InitThreads(); // ensure create GIL, for call Python callback from C. return (void *) CreateProducer(groupId); } + +void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback) { + PyEval_InitThreads(); + CProducer *producer = CreateTransactionProducer(groupId, &PyLocalTransactionCheckerCallback, NULL); + g_TransactionCheckCallBackMap[producer] = localTransactionCheckerCallback; + return producer; +} + +CTransactionStatus PyLocalTransactionCheckerCallback(CProducer *producer, CMessageExt *msg, void *data) { + PyThreadStateLock pyThreadLock; // ensure hold GIL, before call python callback + PyMessageExt message = {.pMessageExt = msg}; + map::iterator iter; + iter = g_TransactionCheckCallBackMap.find(producer); + if (iter != g_TransactionCheckCallBackMap.end()) { + PyObject *pCallback = iter->second; + CTransactionStatus status = boost::python::call(pCallback, message); + return status; + } + return CTransactionStatus::E_UNKNOWN_TRANSACTION; +} + int PyDestroyProducer(void *producer) { return DestroyProducer((CProducer *) producer); } @@ -190,23 +200,23 @@ int PySendMessageOneway(void *producer, void *msg) { return SendMessageOneway((CProducer *) producer, (CMessage *) msg); } -void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){ +void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback) { PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback PySendResult sendResult; sendResult.sendStatus = result.sendStatus; sendResult.offset = result.offset; strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; - PyCallback *callback = (PyCallback *)pyCallback; + PyCallback *callback = (PyCallback *) pyCallback; boost::python::call(callback->successCallback, sendResult, (void *) msg); delete pyCallback; } -void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){ +void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback) { PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback PyMQException exception; - PyCallback *callback = (PyCallback *)pyCallback; + PyCallback *callback = (PyCallback *) pyCallback; exception.error = e.error; exception.line = e.line; strncpy(exception.file, e.file, MAX_EXEPTION_FILE_LENGTH - 1); @@ -219,17 +229,21 @@ void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){ delete pyCallback; } -int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){ - PyCallback* pyCallback = new PyCallback(); +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback) { + PyCallback *pyCallback = new PyCallback(); pyCallback->successCallback = sendSuccessCallback; pyCallback->exceptionCallback = sendExceptionCallback; - return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, (void *)pyCallback); + return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, + (void *) pyCallback); } -PySendResult PySendBatchMessage(void *producer, void *batchMessage) { + +PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) { PySendResult ret; CSendResult result; - SendBatchMessage((CProducer *) producer, (CBatchMessage *) batchMessage, &result); + PyUserData userData = {queueSelector, args}; + SendMessageOrderly((CProducer *) producer, (CMessage *) msg, &PyOrderlyCallbackInner, &userData, autoRetryTimes, + &result); ret.sendStatus = result.sendStatus; ret.offset = result.offset; strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); @@ -237,12 +251,16 @@ PySendResult PySendBatchMessage(void *producer, void *batchMessage) { return ret; } +int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) { + PyUserData *userData = (PyUserData *) args; + int index = boost::python::call(userData->pyObject, size, (void *) msg, userData->pData); + return index; +} -PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) { +PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) { PySendResult ret; CSendResult result; - PyUserData userData = {queueSelector,args}; - SendMessageOrderly((CProducer *) producer, (CMessage *) msg, &PyOrderlyCallbackInner, &userData, autoRetryTimes, &result); + SendMessageOrderlyByShardingKey((CProducer *) producer, (CMessage *) msg, shardingKey, &result); ret.sendStatus = result.sendStatus; ret.offset = result.offset; strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); @@ -250,16 +268,19 @@ PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, return ret; } -int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) { - PyUserData *userData = (PyUserData *)args; - int index = boost::python::call(userData->pyObject, size, (void *) msg, userData->pData); - return index; +CTransactionStatus PyLocalTransactionExecuteCallback(CProducer *producer, CMessage *msg, void *data) { + PyUserData *localCallback = (PyUserData *) data; + CTransactionStatus status = boost::python::call(localCallback->pyObject, (void *) msg, + localCallback->pData); + return status; } -PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) { +PySendResult PySendMessageInTransaction(void *producer, void *msg, PyObject *localTransactionCallback, void *args) { + PyUserData userData = {localTransactionCallback, args}; PySendResult ret; CSendResult result; - SendMessageOrderlyByShardingKey((CProducer *) producer, (CMessage *) msg, shardingKey, &result); + SendMessageTransaction((CProducer *) producer, (CMessage *) msg, &PyLocalTransactionExecuteCallback, &userData, + &result); ret.sendStatus = result.sendStatus; ret.offset = result.offset; strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); @@ -286,6 +307,15 @@ int PyDestroyPushConsumer(void *consumer) { } return DestroyPushConsumer(consumerInner); } +int PyDestroyTransactionProducer(void *producer) { + CProducer *producerInner = (CProducer *) producer; + map::iterator iter; + iter = g_TransactionCheckCallBackMap.find(producerInner); + if (iter != g_TransactionCheckCallBackMap.end()) { + g_TransactionCheckCallBackMap.erase(iter); + } + return DestroyProducer(producerInner); +} int PyStartPushConsumer(void *consumer) { return StartPushConsumer((CPushConsumer *) consumer); } @@ -308,7 +338,7 @@ int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args) return RegisterMessageCallback(consumerInner, &PythonMessageCallBackInner); } -int PyRegisterMessageCallbackOrderly(void *consumer, PyObject *pCallback, object args){ +int PyRegisterMessageCallbackOrderly(void *consumer, PyObject *pCallback, object args) { CPushConsumer *consumerInner = (CPushConsumer *) consumer; g_CallBackMap[consumerInner] = make_pair(pCallback, std::move(args)); return RegisterMessageCallbackOrderly(consumerInner, &PythonMessageCallBackInner); @@ -418,6 +448,10 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { .value("E_LOG_LEVEL_TRACE", E_LOG_LEVEL_TRACE) .value("E_LOG_LEVEL_LEVEL_NUM", E_LOG_LEVEL_LEVEL_NUM); + enum_("TransactionStatus") + .value("E_COMMIT_TRANSACTION", E_COMMIT_TRANSACTION) + .value("E_ROLLBACK_TRANSACTION", E_ROLLBACK_TRANSACTION) + .value("E_UNKNOWN_TRANSACTION", E_UNKNOWN_TRANSACTION); //For Message def("CreateMessage", PyCreateMessage, return_value_policy()); @@ -430,11 +464,6 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SetMessageProperty", PySetMessageProperty); def("SetDelayTimeLevel", PySetMessageDelayTimeLevel); - //For batch message - def("CreateBatchMessage", PyCreateBatchMessage, return_value_policy()); - def("AddMessage", PyAddMessage); - def("DestroyBatchMessage", PyDestroyBatchMessage); - //For MessageExt def("GetMessageTopic", PyGetMessageTopic); def("GetMessageTags", PyGetMessageTags); @@ -445,7 +474,9 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { //For producer def("CreateProducer", PyCreateProducer, return_value_policy()); + def("CreateTransactionProducer", PyCreateTransactionProducer, return_value_policy()); def("DestroyProducer", PyDestroyProducer); + def("DestroyTransactionProducer", PyDestroyTransactionProducer); def("StartProducer", PyStartProducer); def("ShutdownProducer", PyShutdownProducer); def("SetProducerNameServerAddress", PySetProducerNameServerAddress); @@ -462,11 +493,11 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SendMessageSync", PySendMessageSync); def("SendMessageAsync", PySendMessageAsync); - def("SendBatchMessage", PySendBatchMessage); def("SendMessageOneway", PySendMessageOneway); def("SendMessageOrderly", PySendMessageOrderly); def("SendMessageOrderlyByShardingKey", PySendMessageOrderlyByShardingKey); + def("SendMessageInTransaction", PySendMessageInTransaction); //For Consumer def("CreatePushConsumer", PyCreatePushConsumer, return_value_policy()); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 29a4952..732320b 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -18,7 +18,6 @@ #include "CCommon.h" #include "CMessage.h" #include "CMessageExt.h" -#include "CBatchMessage.h" #include "CSendResult.h" #include "CProducer.h" #include "CPushConsumer.h" @@ -91,11 +90,6 @@ int PySetByteMessageBody(void *msg, const char *body, int len); int PySetMessageProperty(void *msg, const char *key, const char *value); int PySetMessageDelayTimeLevel(void *msg, int level); -//batch message -void *PyCreateBatchMessage(); -int PyAddMessage(void *batchMsg, void *msg); -int PyDestroyBatchMessage(void *batchMsg); - //messageExt const char *PyGetMessageTopic(PyMessageExt msgExt); const char *PyGetMessageTags(PyMessageExt msgExt); @@ -106,7 +100,12 @@ const char *PyGetMessageId(PyMessageExt msgExt); //producer void *PyCreateProducer(const char *groupId); +CTransactionStatus PyLocalTransactionCheckerCallback(CProducer *producer, CMessageExt *msg, void *data); +CTransactionStatus PyLocalTransactionExecuteCallback(CProducer *producer, CMessage *msg, void *data); +void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback); + int PyDestroyProducer(void *producer); +int PyDestroyTransactionProducer(void *producer); int PyStartProducer(void *producer); int PyShutdownProducer(void *producer); int PySetProducerNameServerAddress(void *producer, const char *namesrv); @@ -127,9 +126,9 @@ void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback); void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback); int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback); -PySendResult PySendBatchMessage(void *producer, void *msg); PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey); +PySendResult PySendMessageInTransaction(void *producer , void *msg, PyObject *localTransactionExecuteCallback , void *args); int PyOrderlyCallbackInner(int size, CMessage *msg, void *args); diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py index 9baf78e..179e1f1 100644 --- a/test/TestSendMessages.py +++ b/test/TestSendMessages.py @@ -31,8 +31,18 @@ def init_producer(): StartProducer(producer) return producer +def transaction_local_checker(msg): + print 'begin check for msg: ' + GetMessageId(msg) + return TransactionStatus.E_COMMIT_TRANSACTION -producer = init_producer() +def init_transaction_producer(): + producer = CreateTransactionProducer('TransactionTestProducer', transaction_local_checker) + SetProducerLogLevel(producer, CLogLevel.E_LOG_LEVEL_INFO) + SetProducerNameServerAddress(producer, name_srv) + StartProducer(producer) + return producer + +producer = init_transaction_producer() tag = 'rmq-tag' key = 'rmq-key' @@ -257,24 +267,24 @@ def send_message_async_fail(msg, exception): print 'send message failed' print 'error msg: ' + exception.GetMsg() -def send_batch_message(batch_count): +def send_transaction_message(count): key = 'rmq-key' - print 'start send batch message' + print 'start send transaction message' tag = 'test' - batchMsg = CreateBatchMessage() - for n in range(count): body = 'hi rmq message, now is' + str(n) msg = CreateMessage(topic) SetMessageBody(msg, body) SetMessageKeys(msg, key) SetMessageTags(msg, tag) - AddMessage(batchMsg, msg) - DestroyMessage(msg) - SendBatchMessage(producer, batchMsg) - DestroyBatchMessage(batchMsg) - print 'send batch message done' + SendMessageInTransaction(producer, msg, transaction_local_execute, None) + print 'send transaction message done' + time.sleep(10000) + +def transaction_local_execute(msg, args): + print 'begin execute local transaction' + return TransactionStatus.E_UNKNOWN_TRANSACTION if __name__ == '__main__': - send_message_async(10) + send_transaction_message(10)