Merge branch 'v1.5_tcp_perf_problem' into v1.5_issue_243

pull/383/head
Michael Zillgith 4 years ago
commit e0b5ed180f

@ -330,6 +330,8 @@ ServerSocket_accept(ServerSocket self)
if (conSocket) { if (conSocket) {
conSocket->fd = fd; conSocket->fd = fd;
setSocketNonBlocking(conSocket);
activateTcpNoDelay(conSocket); activateTcpNoDelay(conSocket);
} }
else { else {

@ -1,7 +1,7 @@
/* /*
* reporting.c * reporting.c
* *
* Copyright 2013-2020 Michael Zillgith * Copyright 2013-2021 Michael Zillgith
* *
* This file is part of libIEC61850. * This file is part of libIEC61850.
* *
@ -2673,20 +2673,25 @@ exit_function:
return; return;
} /* enqueuReport() */ } /* enqueuReport() */
#define SENT_REPORT_ENTRY_FAILED 0
#define SENT_REPORT_ENTRY_FINISHED 1
#define SENT_REPORT_ENTRY_MORE_FOLLOWS 2
static bool static bool
sendNextReportEntrySegment(ReportControl* self) sendNextReportEntrySegment(ReportControl* self)
{ {
if (self->clientConnection == NULL) if (self->clientConnection == NULL)
return false; return SENT_REPORT_ENTRY_FAILED;
if (self->reportBuffer->nextToTransmit == NULL)
return SENT_REPORT_ENTRY_FINISHED;
bool sentSuccess = true;
bool isBuffered = self->buffered; bool isBuffered = self->buffered;
int maxMmsPduSize = MmsServerConnection_getMaxMmsPduSize(self->clientConnection); int maxMmsPduSize = MmsServerConnection_getMaxMmsPduSize(self->clientConnection);
if (self->reportBuffer->nextToTransmit == NULL) {
return false;
}
int estimatedSegmentSize = 19; /* maximum size of header information (header can have 13-19 byte) */ int estimatedSegmentSize = 19; /* maximum size of header information (header can have 13-19 byte) */
estimatedSegmentSize += 8; /* reserve space for more-segments-follow (3 byte) and sub-seq-num (3-5 byte) */ estimatedSegmentSize += 8; /* reserve space for more-segments-follow (3 byte) and sub-seq-num (3-5 byte) */
@ -3122,7 +3127,7 @@ sendNextReportEntrySegment(ReportControl* self)
if (DEBUG_IED_SERVER) if (DEBUG_IED_SERVER)
printf("IED_SERVER: internal error in report buffer\n"); printf("IED_SERVER: internal error in report buffer\n");
return false; return SENT_REPORT_ENTRY_FAILED;
} }
int dataElementSize = 1 + lenSize + length; int dataElementSize = 1 + lenSize + length;
@ -3194,12 +3199,17 @@ sendNextReportEntrySegment(ReportControl* self)
reportBuffer->size = bufPos; reportBuffer->size = bufPos;
MmsServerConnection_sendMessage(self->clientConnection, reportBuffer); sentSuccess = MmsServerConnection_sendMessage(self->clientConnection, reportBuffer);
MmsServer_releaseTransmitBuffer(self->server->mmsServer); MmsServer_releaseTransmitBuffer(self->server->mmsServer);
IsoConnection_unlock(self->clientConnection->isoConnection); IsoConnection_unlock(self->clientConnection->isoConnection);
if (sentSuccess == false) {
moreFollows = false;
goto exit_function;
}
if (moreFollows == false) { if (moreFollows == false) {
/* reset sub sequence number */ /* reset sub sequence number */
segmented = false; segmented = false;
@ -3242,7 +3252,14 @@ sendNextReportEntrySegment(ReportControl* self)
exit_function: exit_function:
self->segmented = segmented; self->segmented = segmented;
return moreFollows;
if (sentSuccess == false)
return SENT_REPORT_ENTRY_FAILED;
if (moreFollows)
return SENT_REPORT_ENTRY_MORE_FOLLOWS;
else
return SENT_REPORT_ENTRY_FINISHED;
} }
static void static void
@ -3254,8 +3271,25 @@ sendNextReportEntry(ReportControl* self)
while (self->reportBuffer->nextToTransmit) { while (self->reportBuffer->nextToTransmit) {
messageCount++; messageCount++;
while (sendNextReportEntrySegment(self)) {
messageCount++; bool sendNextEntrySegment = true;
int sendResult = SENT_REPORT_ENTRY_FAILED;
while (sendNextEntrySegment) {
sendResult = sendNextReportEntrySegment(self);
if (sendResult != SENT_REPORT_ENTRY_FAILED) {
messageCount++;
}
if (sendResult != SENT_REPORT_ENTRY_MORE_FOLLOWS)
sendNextEntrySegment = false;
}
if (sendResult == SENT_REPORT_ENTRY_FAILED) {
//printf("Failed to send report\n");
break;
} }
if (messageCount > 100) if (messageCount > 100)

@ -55,6 +55,10 @@ typedef struct {
ByteBuffer* writeBuffer; /* buffer to store TPKT packet to send */ ByteBuffer* writeBuffer; /* buffer to store TPKT packet to send */
ByteBuffer* readBuffer; /* buffer to store received TPKT packet */ ByteBuffer* readBuffer; /* buffer to store received TPKT packet */
uint16_t packetSize; /* size of the packet currently received */ uint16_t packetSize; /* size of the packet currently received */
uint8_t* socketExtensionBuffer; /* buffer to store data when TCP socket is not accepting all data */
int socketExtensionBufferSize; /* maximum number of bytes to store in the extension buffer */
int socketExtensionBufferFill; /* number of bytes in the extension buffer (bytes to write) */
} CotpConnection; } CotpConnection;
typedef enum { typedef enum {
@ -80,7 +84,8 @@ CotpConnection_setTpduSize(CotpConnection* self, int tpduSize /* in byte */);
LIB61850_INTERNAL void LIB61850_INTERNAL void
CotpConnection_init(CotpConnection* self, Socket socket, CotpConnection_init(CotpConnection* self, Socket socket,
ByteBuffer* payloadBuffer, ByteBuffer* readBuffer, ByteBuffer* writeBuffer); ByteBuffer* payloadBuffer, ByteBuffer* readBuffer, ByteBuffer* writeBuffer,
uint8_t* socketExtensionBuffer, int socketExtensionBufferSize);
LIB61850_INTERNAL CotpIndication LIB61850_INTERNAL CotpIndication
CotpConnection_parseIncomingMessage(CotpConnection* self); CotpConnection_parseIncomingMessage(CotpConnection* self);

@ -98,7 +98,7 @@ IsoConnection_getSecurityToken(IsoConnection self);
* \param handlerMode specifies if this function is used in the context of the connection handling thread * \param handlerMode specifies if this function is used in the context of the connection handling thread
* (handlerMode) * (handlerMode)
*/ */
LIB61850_INTERNAL void LIB61850_INTERNAL bool
IsoConnection_sendMessage(IsoConnection self, ByteBuffer* message); IsoConnection_sendMessage(IsoConnection self, ByteBuffer* message);
LIB61850_INTERNAL IsoServer LIB61850_INTERNAL IsoServer

@ -51,7 +51,7 @@ MmsServerConnection_destroy(MmsServerConnection connection);
LIB61850_INTERNAL int LIB61850_INTERNAL int
MmsServerConnection_getMaxMmsPduSize(MmsServerConnection self); MmsServerConnection_getMaxMmsPduSize(MmsServerConnection self);
LIB61850_INTERNAL void LIB61850_INTERNAL bool
MmsServerConnection_sendMessage(MmsServerConnection self, ByteBuffer* message); MmsServerConnection_sendMessage(MmsServerConnection self, ByteBuffer* message);
LIB61850_INTERNAL bool LIB61850_INTERNAL bool

@ -200,8 +200,12 @@ sendConnectionRequestMessage(IsoClientConnection self)
self->cotpConnection->handleSet = NULL; self->cotpConnection->handleSet = NULL;
} }
int socketExtensionBufferSize = CONFIG_MMS_MAXIMUM_PDU_SIZE + 1000;
uint8_t* socketExtensionBuffer = GLOBAL_MALLOC(socketExtensionBufferSize);
/* COTP (ISO transport) handshake */ /* COTP (ISO transport) handshake */
CotpConnection_init(self->cotpConnection, self->socket, self->receiveBuffer, self->cotpReadBuffer, self->cotpWriteBuffer); CotpConnection_init(self->cotpConnection, self->socket, self->receiveBuffer, self->cotpReadBuffer, self->cotpWriteBuffer,
socketExtensionBuffer, socketExtensionBufferSize);
#if (CONFIG_MMS_SUPPORT_TLS == 1) #if (CONFIG_MMS_SUPPORT_TLS == 1)
if (self->parameters->tlsConfiguration) { if (self->parameters->tlsConfiguration) {
@ -774,9 +778,13 @@ IsoClientConnection_destroy(IsoClientConnection self)
GLOBAL_FREEMEM(self->receiveBuf); GLOBAL_FREEMEM(self->receiveBuf);
if (self->receiveBuffer != NULL) if (self->receiveBuffer != NULL)
GLOBAL_FREEMEM(self->receiveBuffer); GLOBAL_FREEMEM(self->receiveBuffer);
if (self->cotpConnection != NULL) { if (self->cotpConnection != NULL) {
if (self->cotpConnection->handleSet != NULL) if (self->cotpConnection->handleSet != NULL)
Handleset_destroy(self->cotpConnection->handleSet); Handleset_destroy(self->cotpConnection->handleSet);
GLOBAL_FREEMEM(self->cotpConnection->socketExtensionBuffer);
GLOBAL_FREEMEM(self->cotpConnection); GLOBAL_FREEMEM(self->cotpConnection);
} }

@ -183,16 +183,30 @@ sendBuffer(CotpConnection* self)
bool retVal = false; bool retVal = false;
do { int sentBytes = writeToSocket(self, buffer, remainingSize);
int sentBytes = writeToSocket(self, buffer, remainingSize);
if (sentBytes == -1) if (sentBytes == -1)
goto exit_function; goto exit_function;
if (sentBytes != remainingSize) {
buffer += sentBytes; /* write additional data to extension buffer */
remainingSize -= sentBytes; if (self->socketExtensionBuffer) {
uint8_t* extBuf = self->socketExtensionBuffer;
int extCurrentPos = self->socketExtensionBufferFill;
int bytesNotSent = remainingSize - sentBytes;
} while (remainingSize > 0); int i;
for (i = 0; i < bytesNotSent; i++) {
extBuf[i + extCurrentPos] = buffer[sentBytes + i];
}
self->socketExtensionBufferFill = extCurrentPos + bytesNotSent;
}
else {
goto exit_function;
}
}
retVal = true; retVal = true;
@ -202,6 +216,33 @@ exit_function:
return retVal; return retVal;
} }
static void
flushBuffer(CotpConnection* self)
{
if (self->socketExtensionBufferFill > 0) {
int sentBytes = writeToSocket(self, self->socketExtensionBuffer, self->socketExtensionBufferFill);
if (sentBytes > 0) {
if (sentBytes != self->socketExtensionBufferFill) {
int target = 0;
int i;
uint8_t* buf = self->socketExtensionBuffer;
for (i = sentBytes; i < self->socketExtensionBufferFill; i++) {
buf[target++] = buf[i];
}
self->socketExtensionBufferFill = self->socketExtensionBufferFill - sentBytes;
}
else {
self->socketExtensionBufferFill = 0;
}
}
}
}
CotpIndication CotpIndication
CotpConnection_sendDataMessage(CotpConnection* self, BufferChain payload) CotpConnection_sendDataMessage(CotpConnection* self, BufferChain payload)
{ {
@ -218,6 +259,21 @@ CotpConnection_sendDataMessage(CotpConnection* self, BufferChain payload)
fragments += 1; fragments += 1;
} }
/* calculate total size of fragmented message */
int totalSize = (fragments * (COTP_DATA_HEADER_SIZE + 4)) + payload->length;
/* try to flush extension buffer */
flushBuffer(self);
/* check if totalSize will fit in extension buffer */
if (self->socketExtensionBuffer) {
int freeExtBufSize = self->socketExtensionBufferSize - self->socketExtensionBufferFill;
if (freeExtBufSize < totalSize) {
return COTP_ERROR;
}
}
int currentBufPos = 0; int currentBufPos = 0;
int currentLimit; int currentLimit;
int lastUnit; int lastUnit;
@ -447,7 +503,8 @@ cpo_error:
void void
CotpConnection_init(CotpConnection* self, Socket socket, CotpConnection_init(CotpConnection* self, Socket socket,
ByteBuffer* payloadBuffer, ByteBuffer* readBuffer, ByteBuffer* writeBuffer) ByteBuffer* payloadBuffer, ByteBuffer* readBuffer, ByteBuffer* writeBuffer,
uint8_t* socketExtensionBuffer, int socketExtensionBufferSize)
{ {
self->state = 0; self->state = 0;
self->socket = socket; self->socket = socket;
@ -479,6 +536,10 @@ CotpConnection_init(CotpConnection* self, Socket socket,
self->writeBuffer = writeBuffer; self->writeBuffer = writeBuffer;
self->readBuffer = readBuffer; self->readBuffer = readBuffer;
self->packetSize = 0; self->packetSize = 0;
self->socketExtensionBuffer = socketExtensionBuffer;
self->socketExtensionBufferSize = socketExtensionBufferSize;
self->socketExtensionBufferFill = 0;
} }
int /* in byte */ int /* in byte */

@ -796,10 +796,10 @@ MmsServerConnection_getMaxMmsPduSize(MmsServerConnection self)
return self->maxPduSize; return self->maxPduSize;
} }
void bool
MmsServerConnection_sendMessage(MmsServerConnection self, ByteBuffer* message) MmsServerConnection_sendMessage(MmsServerConnection self, ByteBuffer* message)
{ {
IsoConnection_sendMessage(self->isoConnection, message); return IsoConnection_sendMessage(self->isoConnection, message);
} }
#if (MMS_DYNAMIC_DATA_SETS == 1) #if (MMS_DYNAMIC_DATA_SETS == 1)

@ -122,6 +122,8 @@ finalizeIsoConnection(IsoConnection self)
if (self->cotpConnection) { if (self->cotpConnection) {
if (self->cotpConnection->handleSet) if (self->cotpConnection->handleSet)
Handleset_destroy(self->cotpConnection->handleSet); Handleset_destroy(self->cotpConnection->handleSet);
GLOBAL_FREEMEM(self->cotpConnection->socketExtensionBuffer);
} }
GLOBAL_FREEMEM(self->cotpConnection); GLOBAL_FREEMEM(self->cotpConnection);
@ -527,7 +529,10 @@ IsoConnection_create(Socket socket, IsoServer isoServer, bool isSingleThread)
ByteBuffer_wrap(&(self->cotpWriteBuffer), self->cotpWriteBuf, 0, CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE); ByteBuffer_wrap(&(self->cotpWriteBuffer), self->cotpWriteBuf, 0, CONFIG_COTP_MAX_TPDU_SIZE + TPKT_RFC1006_HEADER_SIZE);
self->cotpConnection = (CotpConnection*) GLOBAL_CALLOC(1, sizeof(CotpConnection)); self->cotpConnection = (CotpConnection*) GLOBAL_CALLOC(1, sizeof(CotpConnection));
CotpConnection_init(self->cotpConnection, self->socket, &(self->rcvBuffer), &(self->cotpReadBuffer), &(self->cotpWriteBuffer)); int socketExtensionBufferSize = CONFIG_MMS_MAXIMUM_PDU_SIZE + 1000;
uint8_t* socketExtensionBuffer = GLOBAL_MALLOC(socketExtensionBufferSize);
CotpConnection_init(self->cotpConnection, self->socket, &(self->rcvBuffer), &(self->cotpReadBuffer), &(self->cotpWriteBuffer),
socketExtensionBuffer, socketExtensionBufferSize);
#if (CONFIG_MMS_SUPPORT_TLS == 1) #if (CONFIG_MMS_SUPPORT_TLS == 1)
if (self->tlsSocket) if (self->tlsSocket)
@ -602,6 +607,8 @@ IsoConnection_destroy(IsoConnection self)
if (self->cotpConnection) { if (self->cotpConnection) {
if (self->cotpConnection->handleSet) if (self->cotpConnection->handleSet)
Handleset_destroy(self->cotpConnection->handleSet); Handleset_destroy(self->cotpConnection->handleSet);
GLOBAL_FREEMEM(self->cotpConnection->socketExtensionBuffer);
} }
GLOBAL_FREEMEM(self); GLOBAL_FREEMEM(self);
@ -635,9 +642,11 @@ IsoConnection_unlock(IsoConnection self)
#endif #endif
} }
void bool
IsoConnection_sendMessage(IsoConnection self, ByteBuffer* message) IsoConnection_sendMessage(IsoConnection self, ByteBuffer* message)
{ {
bool success = false;
if (self->state == ISO_CON_STATE_STOPPED) { if (self->state == ISO_CON_STATE_STOPPED) {
if (DEBUG_ISO_SERVER) if (DEBUG_ISO_SERVER)
printf("DEBUG_ISO_SERVER: sendMessage: connection already stopped!\n"); printf("DEBUG_ISO_SERVER: sendMessage: connection already stopped!\n");
@ -677,8 +686,11 @@ IsoConnection_sendMessage(IsoConnection self, ByteBuffer* message)
printf("ISO_SERVER: IsoConnection_sendMessage success!\n"); printf("ISO_SERVER: IsoConnection_sendMessage success!\n");
} }
if (indication == COTP_OK)
success = true;
exit_error: exit_error:
return; return success;
} }
void void

Loading…
Cancel
Save