- MMS client: refactoring for async connect, abort, release

pull/93/head
Michael Zillgith 7 years ago
parent 8d45d44cc2
commit 70c1cd691d

@ -562,17 +562,20 @@ IedConnection_connect(IedConnection self, IedClientError* error, const char* hos
if (IedConnection_getState(self) != IED_STATE_CONNECTED) {
MmsConnection_setConnectionLostHandler(self->connection, connectionLostHandler, (void*) self);
MmsConnection_setConnectionLostHandler(self->connection, NULL, NULL);
MmsConnection_setInformationReportHandler(self->connection, informationReportHandler, self);
MmsConnection_setConnectTimeout(self->connection, self->connectionTimeout);
if (MmsConnection_connect(self->connection, &mmsError, hostname, tcpPort)) {
MmsConnection_setConnectionLostHandler(self->connection, connectionLostHandler, (void*) self);
*error = IED_ERROR_OK;
IedConnection_setState(self, IED_STATE_CONNECTED);
}
else
else {
IedConnection_setState(self, IED_STATE_IDLE);
*error = iedConnection_mapMmsErrorToIedError(mmsError);
}
}
else
*error = IED_ERROR_ALREADY_CONNECTED;

@ -59,6 +59,13 @@ typedef struct {
char* revision;
} MmsServerIdentity;
typedef enum {
MMS_CONNECTION_STATE_CLOSED,
MMS_CONNECTION_STATE_CONNECTING,
MMS_CONNECTION_STATE_CONNECTED,
MMS_CONNECTION_STATE_CLOSING
} MmsConnectionState;
typedef void (*MmsInformationReportHandler) (void* parameter, char* domainName,
char* variableListName, MmsValue* value, bool isVariableListName);
@ -90,6 +97,20 @@ MmsConnection_create(void);
MmsConnection
MmsConnection_createSecure(TLSConfiguration tlsConfig);
/**
* \brief Create a new MmsConnection instance configured for non-threaded mode
*
* NOTE: This constructor doesn't create a background thread for connection handling.
* The user has to call the MmsConnection_tick function periodically to ensure that
* the MMS connection can be handled properly.
*
* \param tlsConfig TLS configuration parameters and certificates or NULL for non-TLS mode.
*
* \return the newly created instance.
*/
MmsConnection
MmsConnection_createNonThreaded(TLSConfiguration tlsConfig);
/**
* \brief Callback function to intercept raw MMS messages
*
@ -181,6 +202,11 @@ MmsConnection_getIsoConnectionParameters(MmsConnection self);
MmsConnectionParameters
MmsConnection_getMmsConnectionParameters(MmsConnection self);
typedef void (*MmsConnectionStateChangedHandler) (MmsConnection connection, void* parameter, MmsConnectionState newState);
void
MmsConnection_setConnectionStateChangedHandler(MmsConnection self, MmsConnectionStateChangedHandler handler, void* parameter);
/**
* \brief User provided handler function that will be called if the connection to the server is lost
*
@ -235,6 +261,23 @@ MmsConnection_destroy(MmsConnection self);
bool
MmsConnection_connect(MmsConnection self, MmsError* mmsError, const char* serverName, int serverPort);
void
MmsConnection_connectAsync(MmsConnection self, MmsError* mmsError, const char* serverName, int serverPort);
// return value indicates that connection is currently waiting and calling thread can be suspended
/**
* \brief Call MmsConnection state machine and connection handling code (for non-threaded mode only)
*
* This function has to be called periodically by the user application in non-threaded mode.
*
* \return true when connection is currently waiting and calling thread can be suspended, false means
* connection is busy and the tick function should be called again as soon as possible.
*/
bool
MmsConnection_tick(MmsConnection self);
/**
* \brief Close the connection - not recommended
*
@ -246,6 +289,9 @@ MmsConnection_connect(MmsConnection self, MmsError* mmsError, const char* server
void
MmsConnection_close(MmsConnection self);
typedef void
(*MmsConnection_ConcludeAbortHandler) (void* parameter, MmsError mmsError, bool success);
/**
* \brief Uses the MMS/ACSE abort service to close the connection to the server
*
@ -260,6 +306,9 @@ MmsConnection_close(MmsConnection self);
void
MmsConnection_abort(MmsConnection self, MmsError* mmsError);
void
MmsConnection_abortAsync(MmsConnection self, MmsError* mmsError);
/**
* \brief Uses the MMS conclude service to close the connection to the server
*
@ -274,6 +323,9 @@ MmsConnection_abort(MmsConnection self, MmsError* mmsError);
void
MmsConnection_conclude(MmsConnection self, MmsError* mmsError);
void
MmsConnection_concludeAsync(MmsConnection self, MmsError* mmsError, MmsConnection_ConcludeAbortHandler handler, void* parameter);
typedef void
(*MmsConnection_GenericServiceHandler) (int invokeId, void* parameter, MmsError mmsError, bool success);

@ -1,7 +1,7 @@
/*
* iso_client_connection.h
*
* This is an internal interface of MMSClientConnection that connects MMS to the ISO client
* This is an internal interface of MMS client connection that connects MMS to the ISO client
* protocol stack. It is used as an abstraction layer to isolate the MMS code from the lower
* protocol layers.
*
@ -53,15 +53,30 @@ typedef void*
typedef struct sIsoClientConnection* IsoClientConnection;
IsoClientConnection
IsoClientConnection_create(IsoIndicationCallback callback, void* callbackParameter);
IsoClientConnection_create(IsoConnectionParameters parameters, IsoIndicationCallback callback, void* callbackParameter);
void
IsoClientConnection_destroy(IsoClientConnection self);
bool
IsoClientConnection_associateAsync(IsoClientConnection self, uint32_t connectTimeoutInMs);
/**
* called by tick function
*
* \return value indicates that connection is currently waiting and calling thread can be suspended
*/
bool
IsoClientConnection_handleConnection(IsoClientConnection self);
void
IsoClientConnection_associate(IsoClientConnection self, IsoConnectionParameters params,
ByteBuffer* payload, uint32_t connectTimeoutInMs);
IsoClientConnection_associate(IsoClientConnection self, uint32_t connectTimeoutInMs);
/**
* Send message and release the transmit buffer
*
* \param payload message to send
*/
void
IsoClientConnection_sendMessage(IsoClientConnection self, ByteBuffer* payload);
@ -69,12 +84,10 @@ void
IsoClientConnection_release(IsoClientConnection self);
/**
* \brief Send ACSE abort message and wait until connection is closed by server or timeout occured
*
* \return true if abort has been successful, false indicates a timeout
* \brief Send ACSE abort message and wait until connection is closed by server or timeout occurred
*/
bool
IsoClientConnection_abort(IsoClientConnection self);
void
IsoClientConnection_abortAsync(IsoClientConnection self);
void
IsoClientConnection_close(IsoClientConnection self);
@ -93,14 +106,6 @@ IsoClientConnection_allocateTransmitBuffer(IsoClientConnection self);
void
IsoClientConnection_releaseTransmitBuffer(IsoClientConnection self);
/*
* The client should release the receive buffer in order for the IsoClientConnection to
* reuse the buffer! If this function is not called then the reception of messages is
* blocked!
*/
void
IsoClientConnection_releaseReceiveBuffer(IsoClientConnection self);
void*
IsoClientConnection_getSecurityToken(IsoClientConnection self);

@ -1,7 +1,7 @@
/*
* mms_msg_internal.h
*
* Copyright 2013 Michael Zillgith
* Copyright 2013-2018 Michael Zillgith
*
* This file is part of libIEC61850.
*
@ -42,12 +42,7 @@
#define DEBUG_MMS_CLIENT 0
#endif
typedef enum {
MMS_STATE_CLOSED,
MMS_STATE_CONNECTING,
MMS_STATE_CONNECTED
} AssociationState;
#if 0
typedef enum {
MMS_CON_IDLE,
MMS_CON_WAITING,
@ -55,6 +50,7 @@ typedef enum {
MMS_CON_ASSOCIATED,
MMS_CON_RESPONSE_PENDING
} ConnectionState;
#endif
#define CONCLUDE_STATE_CONNECTION_ACTIVE 0
#define CONCLUDE_STATE_REQUESTED 1
@ -102,12 +98,6 @@ struct sMmsConnection {
Semaphore lastInvokeIdLock;
uint32_t lastInvokeId;
Semaphore lastResponseLock;
volatile uint32_t responseInvokeId;
ByteBuffer* lastResponse;
volatile uint32_t lastResponseBufPos;
volatile MmsError lastResponseError;
Semaphore outstandingCallsLock;
MmsOutstandingCall outstandingCalls;
@ -116,30 +106,36 @@ struct sMmsConnection {
IsoClientConnection isoClient;
volatile AssociationState associationState;
Semaphore associationStateLock;
#if (CONFIG_MMS_THREADLESS_STACK == 0)
Thread connectionHandlingThread;
bool createThread;
bool connectionThreadRunning;
#endif
volatile ConnectionState connectionState;
Semaphore connectionStateLock;
volatile MmsConnectionState connectionState;
Semaphore associationStateLock;
MmsConnectionParameters parameters;
IsoConnectionParameters isoParameters;
MmsConnectionStateChangedHandler stateChangedHandler;
void* stateChangedHandlerParameter;
MmsInformationReportHandler reportHandler;
void* reportHandlerParameter;
MmsConnectionLostHandler connectionLostHandler;
void* connectionLostHandlerParameter;
MmsConnection_ConcludeAbortHandler concludeHandler;
void* concludeHandlerParameter;
uint64_t concludeTimeout;
#if (CONFIG_MMS_RAW_MESSAGE_LOGGING == 1)
void* rawMmsMessageHandler;
void* rawMmsMessageHandlerParameter;
#endif
/* state of an active connection conclude/release process */
volatile int concludeState;
Semaphore concludeStateLock;
#if (MMS_OBTAIN_FILE_SERVICE == 1)
int32_t nextFrsmId;
MmsFileReadStateMachine frsms[CONFIG_MMS_MAX_NUMBER_OF_OPEN_FILES_PER_CONNECTION];
@ -322,7 +318,7 @@ bool
mmsClient_parseFileDirectoryResponse(ByteBuffer* response, int bufPos, uint32_t invokeId, MmsConnection_FileDirectoryHandler handler, void* parameter);
bool
mmsClient_parseInitiateResponse(MmsConnection self);
mmsClient_parseInitiateResponse(MmsConnection self, ByteBuffer* response);
int
mmsClient_createConcludeRequest(MmsConnection self, ByteBuffer* message);

@ -3,7 +3,7 @@
*
* Client side representation of the ISO stack (COTP, session, presentation, ACSE)
*
* Copyright 2013 Michael Zillgith
* Copyright 2013-2018 Michael Zillgith
*
* This file is part of libIEC61850.
*
@ -47,21 +47,38 @@
#endif /* DEBUG_ISO_SERVER */
#define STATE_IDLE 0
#define STATE_ASSOCIATED 1
#define STATE_CONNECTED 1
#define STATE_ERROR 2
#define STATE_CONNECTING 3
#define TPKT_RFC1006_HEADER_SIZE 4
#define ISO_CLIENT_BUFFER_SIZE CONFIG_MMS_MAXIMUM_PDU_SIZE + 100
typedef enum {
INT_STATE_IDLE,
INT_STATE_TCP_CONNECTING,
INT_STATE_WAIT_FOR_COTP_CONNECT_RESP,
INT_STATE_WAIT_FOR_ACSE_RESP,
INT_STATE_WAIT_FOR_DATA_MSG,
INT_STATE_CLOSING_CONNECTION,
INT_STATE_CLOSE_ON_ERROR,
INT_STATE_ERROR
} eIsoClientInternalState;
struct sIsoClientConnection
{
IsoConnectionParameters parameters;
IsoIndicationCallback callback;
void* callbackParameter;
volatile eIsoClientInternalState intState;
volatile int state;
Semaphore stateMutex;
uint64_t nextReadTimeout;
Socket socket;
#if (CONFIG_MMS_SUPPORT_TLS == 1)
@ -81,20 +98,14 @@ struct sIsoClientConnection
Semaphore transmitBufferMutex;
ByteBuffer* receivePayloadBuffer;
Semaphore receiveBufferMutex;
Semaphore tickMutex;
uint8_t* cotpReadBuf;
uint8_t* cotpWriteBuf;
ByteBuffer* cotpReadBuffer;
ByteBuffer* cotpWriteBuffer;
volatile bool handlingThreadRunning;
volatile bool stopHandlingThread;
volatile bool destroyHandlingThread;
volatile bool startHandlingThread;
Thread thread;
};
static void
@ -117,387 +128,565 @@ getState(IsoClientConnection self)
return stateVal;
}
static void
connectionHandlingThread(IsoClientConnection self)
static inline void
setIntState(IsoClientConnection self, eIsoClientInternalState newState)
{
IsoSessionIndication sessionIndication;
self->intState = newState;
}
self->handlingThreadRunning = true;
static inline eIsoClientInternalState
getIntState(IsoClientConnection self)
{
return self->intState;
}
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT_CONNECTION: new connection %p\n", self);
IsoClientConnection
IsoClientConnection_create(IsoConnectionParameters parameters, IsoIndicationCallback callback, void* callbackParameter)
{
IsoClientConnection self = (IsoClientConnection) GLOBAL_CALLOC(1, sizeof(struct sIsoClientConnection));
/* Wait until lower layer association is finished */
Semaphore_wait(self->receiveBufferMutex);
if (self) {
CotpConnection_resetPayload(self->cotpConnection);
self->parameters = parameters;
self->callback = callback;
self->callbackParameter = callbackParameter;
while (true) {
self->intState = INT_STATE_IDLE;
self->state = STATE_IDLE;
self->stateMutex = Semaphore_create(1);
TpktState packetState;
self->sendBuffer = (uint8_t*) GLOBAL_MALLOC(ISO_CLIENT_BUFFER_SIZE);
while ((packetState = CotpConnection_readToTpktBuffer(self->cotpConnection)) == TPKT_WAITING)
{
Thread_sleep(1);
self->transmitPayloadBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
self->transmitPayloadBuffer->buffer = self->sendBuffer;
self->transmitPayloadBuffer->maxSize = ISO_CLIENT_BUFFER_SIZE;
if (self->stopHandlingThread) {
packetState = TPKT_ERROR;
break;
}
self->receivePayloadBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
self->callback(ISO_IND_TICK, self->callbackParameter, NULL);
}
self->transmitBufferMutex = Semaphore_create(1);
if (packetState == TPKT_ERROR)
break;
self->tickMutex = Semaphore_create(1);
CotpIndication cotpIndication = CotpConnection_parseIncomingMessage(self->cotpConnection);
self->receiveBuf = (uint8_t*) GLOBAL_MALLOC(ISO_CLIENT_BUFFER_SIZE);
self->receiveBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
ByteBuffer_wrap(self->receiveBuffer, self->receiveBuf, 0, ISO_CLIENT_BUFFER_SIZE);
if (cotpIndication == COTP_MORE_FRAGMENTS_FOLLOW)
continue;
self->presentation = (IsoPresentation*) GLOBAL_CALLOC(1, sizeof(IsoPresentation));
if (cotpIndication != COTP_DATA_INDICATION)
break;
self->session = (IsoSession*) GLOBAL_CALLOC(1, sizeof(IsoSession));
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT_CONNECTION: parse message\n");
self->cotpReadBuf = (uint8_t*) GLOBAL_MALLOC(CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
self->cotpWriteBuf = (uint8_t*) GLOBAL_MALLOC(CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
sessionIndication =
IsoSession_parseMessage(self->session,
CotpConnection_getPayload(self->cotpConnection));
self->cotpReadBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
ByteBuffer_wrap(self->cotpReadBuffer, self->cotpReadBuf, 0, CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
if (sessionIndication != SESSION_DATA) {
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT_CONNECTION: Invalid session message\n");
break;
}
self->cotpWriteBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
ByteBuffer_wrap(self->cotpWriteBuffer, self->cotpWriteBuf, 0, CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
self->cotpConnection = (CotpConnection*) GLOBAL_CALLOC(1, sizeof(CotpConnection));
}
return self;
}
static bool
sendConnectionRequestMessage(IsoClientConnection self)
{
/* COTP (ISO transport) handshake */
CotpConnection_init(self->cotpConnection, self->socket, self->receiveBuffer, self->cotpReadBuffer, self->cotpWriteBuffer);
#if (CONFIG_MMS_SUPPORT_TLS == 1)
if (params->tlsConfiguration) {
/* create TLSSocket and start TLS authentication */
TLSSocket tlsSocket = TLSSocket_create(self->socket, params->tlsConfiguration, false);
if (tlsSocket)
self->cotpConnection->tlsSocket = tlsSocket;
else {
if (!IsoPresentation_parseUserData(self->presentation, IsoSession_getUserData(self->session))) {
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT_CONNECTION: Invalid presentation message\n");
break;
printf("TLS handshake failed!\n");
goto returnError;
}
}
#endif /* (CONFIG_MMS_SUPPORT_TLS == 1) */
/* COTP (ISO transport) handshake */
CotpIndication cotpIndication =
CotpConnection_sendConnectionRequestMessage(self->cotpConnection, self->parameters);
self->callback(ISO_IND_DATA, self->callbackParameter,
&(self->presentation->nextPayload));
if (cotpIndication != COTP_OK)
return false;
else
return true;
}
/* wait for user to release the buffer */
Semaphore_wait(self->receiveBufferMutex);
static void
sendAcseInitiateRequest(IsoClientConnection self)
{
/* Upper layers handshake */
struct sBufferChain sAcsePayload;
BufferChain acsePayload = &sAcsePayload;
acsePayload->buffer = self->transmitPayloadBuffer->buffer;
acsePayload->partLength = self->transmitPayloadBuffer->size;
acsePayload->length = self->transmitPayloadBuffer->size;
acsePayload->nextPart = NULL;
CotpConnection_resetPayload(self->cotpConnection);
}
AcseConnection_init(&(self->acseConnection), NULL, NULL, NULL);
self->callback(ISO_IND_CLOSED, self->callbackParameter, NULL);;
AcseAuthenticationParameter authParameter = self->parameters->acseAuthParameter;
setState(self, STATE_IDLE);
struct sBufferChain sAcseBuffer;
BufferChain acseBuffer = &sAcseBuffer;
acseBuffer->buffer = self->sendBuffer + self->transmitPayloadBuffer->size;
acseBuffer->partMaxLength = ISO_CLIENT_BUFFER_SIZE - acsePayload->length;
AcseConnection_createAssociateRequestMessage(&(self->acseConnection), self->parameters, acseBuffer, acsePayload,
authParameter);
struct sBufferChain sPresentationBuffer;
BufferChain presentationBuffer = &sPresentationBuffer;
presentationBuffer->buffer = self->sendBuffer + acseBuffer->length;
presentationBuffer->partMaxLength = ISO_CLIENT_BUFFER_SIZE - acseBuffer->length;
IsoPresentation_init(self->presentation);
IsoPresentation_createConnectPdu(self->presentation, self->parameters, presentationBuffer, acseBuffer);
struct sBufferChain sSessionBuffer;
BufferChain sessionBuffer = &sSessionBuffer;
sessionBuffer->buffer = self->sendBuffer + presentationBuffer->length;
IsoSession_init(self->session);
IsoSession_createConnectSpdu(self->session, self->parameters, sessionBuffer,
presentationBuffer);
CotpConnection_sendDataMessage(self->cotpConnection, sessionBuffer);
Semaphore_post(self->transmitBufferMutex);
}
static void
releaseSocket(IsoClientConnection self)
{
if (self->socket) {
#if (CONFIG_MMS_SUPPORT_TLS == 1)
if (self->cotpConnection->tlsSocket)
TLSSocket_close(self->cotpConnection->tlsSocket);
#endif
Socket_destroy(self->socket);
Socket_destroy(self->socket);
self->socket = NULL;
}
}
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT_CONNECTION: exit connection %p\n", self);
/*
* Connection state machine
*
* called by tick function
*
* \return value indicates that connection is currently waiting and calling thread can be suspended
*/
bool
IsoClientConnection_handleConnection(IsoClientConnection self)
{
Semaphore_wait(self->tickMutex);
/* release buffer to enable reuse of client connection */
Semaphore_post(self->receiveBufferMutex);
bool waits = false;
self->handlingThreadRunning = false;
}
eIsoClientInternalState currentState = getIntState(self);
eIsoClientInternalState nextState = currentState;
static void*
connectionThreadFunction(void* parameter)
{
IsoClientConnection self = (IsoClientConnection) parameter;
switch (currentState) {
case INT_STATE_IDLE:
case INT_STATE_ERROR:
waits = true;
break;
while (self->destroyHandlingThread == false) {
case INT_STATE_TCP_CONNECTING:
{
SocketState socketState = Socket_checkAsyncConnectState(self->socket);
if (socketState == SOCKET_STATE_CONNECTED) {
if (sendConnectionRequestMessage(self)) {
self->nextReadTimeout = Hal_getTimeInMs() + CONFIG_TCP_READ_TIMEOUT_MS;
nextState = INT_STATE_WAIT_FOR_COTP_CONNECT_RESP;
}
else {
IsoClientConnection_releaseTransmitBuffer(self);
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
nextState = INT_STATE_CLOSE_ON_ERROR;
}
}
else if (socketState == SOCKET_STATE_FAILED) {
IsoClientConnection_releaseTransmitBuffer(self);
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
waits = true;
}
if (self->startHandlingThread) {
self->startHandlingThread = false;
connectionHandlingThread(self);
}
break;
Thread_sleep(1);
}
case INT_STATE_WAIT_FOR_COTP_CONNECT_RESP:
{
uint64_t currentTime = Hal_getTimeInMs();
self->destroyHandlingThread = false;
if (currentTime > self->nextReadTimeout) {
return NULL;
}
if (DEBUG_ISO_CLIENT)
printf("Timeout waiting for COTP CR\n");
IsoClientConnection_releaseTransmitBuffer(self);
IsoClientConnection
IsoClientConnection_create(IsoIndicationCallback callback, void* callbackParameter)
{
IsoClientConnection self = (IsoClientConnection) GLOBAL_CALLOC(1, sizeof(struct sIsoClientConnection));
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
if (self == NULL)
return NULL;
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
self->callback = callback;
self->callbackParameter = callbackParameter;
TpktState packetState = CotpConnection_readToTpktBuffer(self->cotpConnection);
self->state = STATE_IDLE;
self->stateMutex = Semaphore_create(1);
if (packetState == TPKT_PACKET_COMPLETE) {
self->sendBuffer = (uint8_t*) GLOBAL_MALLOC(ISO_CLIENT_BUFFER_SIZE);
CotpIndication cotpIndication = CotpConnection_parseIncomingMessage(self->cotpConnection);
self->transmitPayloadBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
self->transmitPayloadBuffer->buffer = self->sendBuffer;
self->transmitPayloadBuffer->maxSize = ISO_CLIENT_BUFFER_SIZE;
if (cotpIndication != COTP_CONNECT_INDICATION) {
if (DEBUG_ISO_CLIENT)
printf("Unexpected COTP state (%i)\n", cotpIndication);
self->receivePayloadBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
IsoClientConnection_releaseTransmitBuffer(self);
self->transmitBufferMutex = Semaphore_create(1);
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
self->receiveBufferMutex = Semaphore_create(1);
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
sendAcseInitiateRequest(self);
self->receiveBuf = (uint8_t*) GLOBAL_MALLOC(ISO_CLIENT_BUFFER_SIZE);
self->receiveBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
ByteBuffer_wrap(self->receiveBuffer, self->receiveBuf, 0, ISO_CLIENT_BUFFER_SIZE);
self->nextReadTimeout = Hal_getTimeInMs() + CONFIG_TCP_READ_TIMEOUT_MS;
self->presentation = (IsoPresentation*) GLOBAL_CALLOC(1, sizeof(IsoPresentation));
nextState = INT_STATE_WAIT_FOR_ACSE_RESP;
}
}
else if (packetState == TPKT_ERROR) {
if (DEBUG_ISO_CLIENT)
printf("Error receiving COTP message\n");
self->session = (IsoSession*) GLOBAL_CALLOC(1, sizeof(IsoSession));
IsoClientConnection_releaseTransmitBuffer(self);
self->cotpReadBuf = (uint8_t*) GLOBAL_MALLOC(CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
self->cotpWriteBuf = (uint8_t*) GLOBAL_MALLOC(CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
self->cotpReadBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
ByteBuffer_wrap(self->cotpReadBuffer, self->cotpReadBuf, 0, CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
waits = true;
}
self->cotpWriteBuffer = (ByteBuffer*) GLOBAL_CALLOC(1, sizeof(ByteBuffer));
ByteBuffer_wrap(self->cotpWriteBuffer, self->cotpWriteBuf, 0, CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
}
}
break;
self->cotpConnection = (CotpConnection*) GLOBAL_CALLOC(1, sizeof(CotpConnection));
case INT_STATE_WAIT_FOR_ACSE_RESP:
{
uint64_t currentTime = Hal_getTimeInMs();
self->handlingThreadRunning = false;
if (currentTime > self->nextReadTimeout) {
self->stopHandlingThread = false;
self->destroyHandlingThread = false;
self->startHandlingThread = false;
if (DEBUG_ISO_CLIENT)
printf("Timeout waiting for ACSE initiate response\n");
return self;
}
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
void
IsoClientConnection_associate(IsoClientConnection self, IsoConnectionParameters params,
ByteBuffer* payload, uint32_t connectTimeoutInMs)
{
self->socket = TcpSocket_create();
nextState = INT_STATE_ERROR;
}
else {
Socket_setConnectTimeout(self->socket, connectTimeoutInMs);
TpktState packetState = CotpConnection_readToTpktBuffer(self->cotpConnection);
#if (CONFIG_ACTIVATE_TCP_KEEPALIVE == 1)
Socket_activateTcpKeepAlive(self->socket,
CONFIG_TCP_KEEPALIVE_IDLE,
CONFIG_TCP_KEEPALIVE_INTERVAL,
CONFIG_TCP_KEEPALIVE_CNT);
#endif
// (1) Function blocks
if (!Socket_connect(self->socket, params->hostname, params->tcpPort))
goto returnError;
if (packetState == TPKT_PACKET_COMPLETE) {
// (2) Send connection request message
CotpIndication cotpIndication = CotpConnection_parseIncomingMessage(self->cotpConnection);
/* COTP (ISO transport) handshake */
CotpConnection_init(self->cotpConnection, self->socket, self->receiveBuffer, self->cotpReadBuffer, self->cotpWriteBuffer);
if (cotpIndication != COTP_DATA_INDICATION) {
if (DEBUG_ISO_CLIENT)
printf("Unexpected COTP state (%i)\n", cotpIndication);
#if (CONFIG_MMS_SUPPORT_TLS == 1)
if (params->tlsConfiguration) {
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
/* create TLSSocket and start TLS authentication */
TLSSocket tlsSocket = TLSSocket_create(self->socket, params->tlsConfiguration, false);
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
if (tlsSocket)
self->cotpConnection->tlsSocket = tlsSocket;
else {
/* parse ACSE response */
if (DEBUG_ISO_CLIENT)
printf("TLS handshake failed!\n");
IsoSessionIndication sessionIndication;
goto returnError;
}
}
#endif /* (CONFIG_MMS_SUPPORT_TLS == 1) */
sessionIndication =
IsoSession_parseMessage(self->session, CotpConnection_getPayload(self->cotpConnection));
if (sessionIndication != SESSION_CONNECT) {
if (DEBUG_ISO_CLIENT)
printf("IsoClientConnection_associate: no session connect indication\n");
/* COTP (ISO transport) handshake */
CotpIndication cotpIndication =
CotpConnection_sendConnectionRequestMessage(self->cotpConnection, params);
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
TpktState packetState;
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
uint64_t timeout = Hal_getTimeInMs() + CONFIG_TCP_READ_TIMEOUT_MS;
if (IsoPresentation_parseAcceptMessage(self->presentation, IsoSession_getUserData(self->session)) == false) {
// (3) Waiting for response (blocking)
while (((packetState = CotpConnection_readToTpktBuffer(self->cotpConnection)) == TPKT_WAITING)
&& (Hal_getTimeInMs() < timeout))
{
Thread_sleep(1);
}
if (DEBUG_ISO_CLIENT)
printf("IsoClientConnection_associate: no presentation ok indication\n");
if (packetState != TPKT_PACKET_COMPLETE)
goto returnError;
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
cotpIndication = CotpConnection_parseIncomingMessage(self->cotpConnection);
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
if (cotpIndication != COTP_CONNECT_INDICATION)
goto returnError;
AcseIndication acseIndication = AcseConnection_parseMessage(&(self->acseConnection), &self->presentation->nextPayload);
// (4) Send ACSE Initiate request
if (acseIndication != ACSE_ASSOCIATE) {
if (DEBUG_ISO_CLIENT)
printf("IsoClientConnection_associate: no ACSE_ASSOCIATE indication\n");
/* Upper layers handshake */
struct sBufferChain sAcsePayload;
BufferChain acsePayload = &sAcsePayload;
acsePayload->buffer = payload->buffer;
acsePayload->partLength = payload->size;
acsePayload->length = payload->size;
acsePayload->nextPart = NULL;
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
AcseConnection_init(&(self->acseConnection), NULL, NULL, NULL);
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
AcseAuthenticationParameter authParameter = params->acseAuthParameter;
}
struct sBufferChain sAcseBuffer;
BufferChain acseBuffer = &sAcseBuffer;
}
acseBuffer->buffer = self->sendBuffer + payload->size;
acseBuffer->partMaxLength = ISO_CLIENT_BUFFER_SIZE - acsePayload->length;
ByteBuffer_wrap(self->receivePayloadBuffer, self->acseConnection.userDataBuffer,
self->acseConnection.userDataBufferSize, self->acseConnection.userDataBufferSize);
AcseConnection_createAssociateRequestMessage(&(self->acseConnection), params, acseBuffer, acsePayload,
authParameter);
setState(self, STATE_CONNECTED);
nextState = INT_STATE_WAIT_FOR_DATA_MSG;
struct sBufferChain sPresentationBuffer;
BufferChain presentationBuffer = &sPresentationBuffer;
self->callback(ISO_IND_ASSOCIATION_SUCCESS, self->callbackParameter, self->receivePayloadBuffer);
presentationBuffer->buffer = self->sendBuffer + acseBuffer->length;
presentationBuffer->partMaxLength = ISO_CLIENT_BUFFER_SIZE - acseBuffer->length;
CotpConnection_resetPayload(self->cotpConnection);
}
}
}
else if (packetState == TPKT_ERROR) {
if (DEBUG_ISO_CLIENT)
printf("Error receiving COTP message\n");
IsoPresentation_init(self->presentation);
IsoPresentation_createConnectPdu(self->presentation, params, presentationBuffer, acseBuffer);
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
struct sBufferChain sSessionBuffer;
BufferChain sessionBuffer = &sSessionBuffer;
sessionBuffer->buffer = self->sendBuffer + presentationBuffer->length;
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
waits = true;
}
IsoSession_init(self->session);
IsoSession_createConnectSpdu(self->session, params, sessionBuffer,
presentationBuffer);
}
}
break;
CotpConnection_sendDataMessage(self->cotpConnection, sessionBuffer);
case INT_STATE_WAIT_FOR_DATA_MSG:
{
TpktState packetState = CotpConnection_readToTpktBuffer(self->cotpConnection);
Semaphore_post(self->transmitBufferMutex);
if (packetState == TPKT_ERROR) {
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else if (packetState == TPKT_PACKET_COMPLETE) {
while ((packetState = CotpConnection_readToTpktBuffer(self->cotpConnection)) == TPKT_WAITING)
{
Thread_sleep(1);
}
CotpIndication cotpIndication = CotpConnection_parseIncomingMessage(self->cotpConnection);
cotpIndication = CotpConnection_parseIncomingMessage(self->cotpConnection);
switch (cotpIndication) {
if (cotpIndication != COTP_DATA_INDICATION)
goto returnError;
case COTP_MORE_FRAGMENTS_FOLLOW:
break;
IsoSessionIndication sessionIndication;
case COTP_DISCONNECT_INDICATION:
{
nextState = INT_STATE_CLOSING_CONNECTION;
}
break;
sessionIndication =
IsoSession_parseMessage(self->session, CotpConnection_getPayload(self->cotpConnection));
case COTP_DATA_INDICATION:
{
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT_CONNECTION: parse message\n");
if (sessionIndication != SESSION_CONNECT) {
if (DEBUG_ISO_CLIENT)
printf("IsoClientConnection_associate: no session connect indication\n");
goto returnError;
}
IsoSessionIndication sessionIndication =
IsoSession_parseMessage(self->session,
CotpConnection_getPayload(self->cotpConnection));
if (!IsoPresentation_parseAcceptMessage(self->presentation, IsoSession_getUserData(self->session))) {
if (DEBUG_ISO_CLIENT)
printf("IsoClientConnection_associate: no presentation ok indication\n");
goto returnError;
}
if (sessionIndication != SESSION_DATA) {
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT_CONNECTION: Invalid session message\n");
AcseIndication acseIndication;
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
// (5) Wait for ACSE initiate response message
if (!IsoPresentation_parseUserData(self->presentation, IsoSession_getUserData(self->session))) {
acseIndication = AcseConnection_parseMessage(&(self->acseConnection), &self->presentation->nextPayload);
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT_CONNECTION: Invalid presentation message\n");
if (acseIndication != ACSE_ASSOCIATE) {
if (DEBUG_ISO_CLIENT)
printf("IsoClientConnection_associate: no ACSE_ASSOCIATE indication\n");
goto returnError;
}
nextState = INT_STATE_CLOSE_ON_ERROR;
}
else {
self->callback(ISO_IND_DATA, self->callbackParameter,
&(self->presentation->nextPayload));
CotpConnection_resetPayload(self->cotpConnection);
}
}
}
break;
default:
{
nextState = INT_STATE_CLOSE_ON_ERROR;
}
break;
}
}
else {
waits = true;
}
}
break;
ByteBuffer_wrap(self->receivePayloadBuffer, self->acseConnection.userDataBuffer,
self->acseConnection.userDataBufferSize, self->acseConnection.userDataBufferSize);
case INT_STATE_CLOSE_ON_ERROR:
{
setState(self, STATE_ERROR);
self->callback(ISO_IND_CLOSED, self->callbackParameter, NULL);;
releaseSocket(self);
nextState = INT_STATE_ERROR;
}
break;
Semaphore_wait(self->receiveBufferMutex);
case INT_STATE_CLOSING_CONNECTION:
{
setState(self, STATE_IDLE);
self->callback(ISO_IND_ASSOCIATION_SUCCESS, self->callbackParameter, self->receivePayloadBuffer);
self->callback(ISO_IND_CLOSED, self->callbackParameter, NULL);;
/* wait for upper layer to release buffer */
Semaphore_wait(self->receiveBufferMutex);
releaseSocket(self);
setState(self, STATE_ASSOCIATED);
nextState = INT_STATE_IDLE;
}
break;
if (self->thread == NULL) {
self->thread = Thread_create(connectionThreadFunction, self, false);
Thread_start(self->thread);
}
self->startHandlingThread = true;
self->callback(ISO_IND_TICK, self->callbackParameter, NULL);
setIntState(self, nextState);
while (self->handlingThreadRunning == false)
Thread_sleep(1);
Semaphore_post(self->tickMutex);
return;
return waits;
}
returnError:
self->callback(ISO_IND_ASSOCIATION_FAILED, self->callbackParameter, NULL);
setState(self, STATE_ERROR);
bool
IsoClientConnection_associateAsync(IsoClientConnection self, uint32_t connectTimeoutInMs)
{
bool success = true;
Socket_destroy(self->socket);
self->socket = NULL;
/* Create socket and start connect */
setState(self, STATE_CONNECTING);
Semaphore_post(self->transmitBufferMutex);
Semaphore_wait(self->tickMutex);
self->socket = TcpSocket_create();
Socket_setConnectTimeout(self->socket, connectTimeoutInMs);
return;
#if (CONFIG_ACTIVATE_TCP_KEEPALIVE == 1)
Socket_activateTcpKeepAlive(self->socket,
CONFIG_TCP_KEEPALIVE_IDLE,
CONFIG_TCP_KEEPALIVE_INTERVAL,
CONFIG_TCP_KEEPALIVE_CNT);
#endif
setIntState(self, INT_STATE_TCP_CONNECTING);
if (Socket_connectAsync(self->socket, self->parameters->hostname, self->parameters->tcpPort) == false) {
Socket_destroy(self->socket);
self->socket = NULL;
setIntState(self, INT_STATE_ERROR);
setState(self, STATE_ERROR);
IsoClientConnection_releaseTransmitBuffer(self);
success = false;
}
Semaphore_post(self->tickMutex);
return success;
}
void
IsoClientConnection_sendMessage(IsoClientConnection self, ByteBuffer* payloadBuffer)
{
if (getState(self) == STATE_CONNECTED) {
struct sBufferChain payloadBCMemory;
BufferChain payload = &payloadBCMemory;
struct sBufferChain payloadBCMemory;
BufferChain payload = &payloadBCMemory;
BufferChain_init(payload, payloadBuffer->size, payloadBuffer->size, NULL, payloadBuffer->buffer);
BufferChain_init(payload, payloadBuffer->size, payloadBuffer->size, NULL, payloadBuffer->buffer);
struct sBufferChain presentationBCMemory;
BufferChain presentationBuffer = &presentationBCMemory;
struct sBufferChain presentationBCMemory;
BufferChain presentationBuffer = &presentationBCMemory;
presentationBuffer->buffer = self->sendBuffer + payload->length;
presentationBuffer->partMaxLength = ISO_CLIENT_BUFFER_SIZE;
presentationBuffer->buffer = self->sendBuffer + payload->length;
presentationBuffer->partMaxLength = ISO_CLIENT_BUFFER_SIZE;
IsoPresentation_createUserData(self->presentation, presentationBuffer, payload);
IsoPresentation_createUserData(self->presentation, presentationBuffer, payload);
struct sBufferChain sessionBufferBCMemory;
BufferChain sessionBuffer = &sessionBufferBCMemory;
struct sBufferChain sessionBufferBCMemory;
BufferChain sessionBuffer = &sessionBufferBCMemory;
IsoSession_createDataSpdu(self->session, sessionBuffer, presentationBuffer);
IsoSession_createDataSpdu(self->session, sessionBuffer, presentationBuffer);
CotpIndication indication = CotpConnection_sendDataMessage(self->cotpConnection, sessionBuffer);
CotpIndication indication = CotpConnection_sendDataMessage(self->cotpConnection, sessionBuffer);
if (indication != COTP_OK)
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT: IsoClientConnection_sendMessage: send message failed!\n");
}
else {
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT: Not connected --> cannot send message\n");
}
/* release transmit buffer for use by API client */
Semaphore_post(self->transmitBufferMutex);
if (indication != COTP_OK)
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT: IsoClientConnection_sendMessage: send message failed!\n");
}
void
@ -506,13 +695,21 @@ IsoClientConnection_close(IsoClientConnection self)
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT: IsoClientConnection_close\n");
if (self->handlingThreadRunning) {
self->stopHandlingThread = true;
while (self->handlingThreadRunning)
Thread_sleep(1);
}
Semaphore_wait(self->tickMutex);
eIsoClientInternalState intState = getIntState(self);
if ((intState != INT_STATE_IDLE) && (intState != INT_STATE_ERROR) && (intState != INT_STATE_CLOSE_ON_ERROR)) {
setIntState(self, INT_STATE_CLOSING_CONNECTION);
setState(self, STATE_IDLE);
Semaphore_post(self->tickMutex);
IsoClientConnection_handleConnection(self);
setState(self, STATE_IDLE);
}
else {
Semaphore_post(self->tickMutex);
}
}
@ -522,7 +719,9 @@ IsoClientConnection_destroy(IsoClientConnection self)
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT: IsoClientConnection_destroy\n");
if (getState(self) == STATE_ASSOCIATED) {
int state = getState(self);
if (state == STATE_CONNECTED) {
if (DEBUG_ISO_CLIENT)
printf("ISO_CLIENT: call IsoClientConnection_close\n");
@ -530,16 +729,6 @@ IsoClientConnection_destroy(IsoClientConnection self)
IsoClientConnection_close(self);
}
/* stop handling thread */
self->destroyHandlingThread = true;
if (self->thread != NULL) {
while (self->destroyHandlingThread)
Thread_sleep(1);
Thread_destroy(self->thread);
}
if (self->receiveBuf != NULL)
GLOBAL_FREEMEM(self->receiveBuf);
if (self->receiveBuffer != NULL)
@ -567,18 +756,17 @@ IsoClientConnection_destroy(IsoClientConnection self)
GLOBAL_FREEMEM(self->transmitPayloadBuffer);
GLOBAL_FREEMEM(self->receivePayloadBuffer);
Semaphore_destroy(self->receiveBufferMutex);
Semaphore_destroy(self->transmitBufferMutex);
Semaphore_destroy(self->stateMutex);
Semaphore_destroy(self->tickMutex);
GLOBAL_FREEMEM(self->sendBuffer);
GLOBAL_FREEMEM(self);
}
bool
IsoClientConnection_abort(IsoClientConnection self)
static void
sendAbortMessage(IsoClientConnection self)
{
//TODO block other messages from being sent
IsoClientConnection_allocateTransmitBuffer(self);
struct sBufferChain sAcseBuffer;
@ -608,15 +796,12 @@ IsoClientConnection_abort(IsoClientConnection self)
CotpConnection_sendDataMessage(self->cotpConnection, sessionBuffer);
Semaphore_post(self->transmitBufferMutex);
}
uint64_t timeout = Hal_getTimeInMs() + CONFIG_TCP_READ_TIMEOUT_MS;
while ((self->handlingThreadRunning == true) && (Hal_getTimeInMs() < timeout));
if (self->handlingThreadRunning)
return false;
else
return true;
void
IsoClientConnection_abortAsync(IsoClientConnection self)
{
sendAbortMessage(self);
}
void
@ -669,9 +854,3 @@ IsoClientConnection_releaseTransmitBuffer(IsoClientConnection self)
{
Semaphore_post(self->transmitBufferMutex);
}
void
IsoClientConnection_releaseReceiveBuffer(IsoClientConnection self)
{
Semaphore_post(self->receiveBufferMutex);
}

File diff suppressed because it is too large Load Diff

@ -160,10 +160,8 @@ parseInitResponseDetail(MmsConnection self, uint8_t* buffer, int bufPos, int max
}
bool
mmsClient_parseInitiateResponse(MmsConnection self)
mmsClient_parseInitiateResponse(MmsConnection self, ByteBuffer* response)
{
bool result = false;
self->parameters.maxPduSize = CONFIG_MMS_MAXIMUM_PDU_SIZE;
self->parameters.dataStructureNestingLevel = DEFAULT_DATA_STRUCTURE_NESTING_LEVEL;
self->parameters.maxServOutstandingCalled = DEFAULT_MAX_SERV_OUTSTANDING_CALLED;
@ -171,8 +169,8 @@ mmsClient_parseInitiateResponse(MmsConnection self)
int bufPos = 1; /* ignore tag - already checked */
int maxBufPos = ByteBuffer_getSize(self->lastResponse);
uint8_t* buffer = ByteBuffer_getBuffer(self->lastResponse);
int maxBufPos = ByteBuffer_getSize(response);
uint8_t* buffer = ByteBuffer_getBuffer(response);
int length;
bufPos = BerDecoder_decodeLength(buffer, &length, bufPos, maxBufPos);
@ -237,5 +235,5 @@ mmsClient_parseInitiateResponse(MmsConnection self)
}
return result;
return true;
}

Loading…
Cancel
Save