- fixed TCP performance problem for Linux (LIB61850-243)

pull/383/head
Michael Zillgith 4 years ago
parent 161e88a3ef
commit 6525a72930

@ -329,6 +329,8 @@ ServerSocket_accept(ServerSocket self)
conSocket = (Socket) GLOBAL_CALLOC(1, sizeof(struct sSocket));
conSocket->fd = fd;
setSocketNonBlocking(conSocket);
activateTcpNoDelay(conSocket);
}
@ -658,6 +660,10 @@ Socket_write(Socket self, uint8_t* buf, int size)
printf("DEBUG_SOCKET: send returned error (errno=%i)\n", errno);
}
}
else {
if (size != retVal)
printf("send(%i)->%i\n", size, retVal);
}
return retVal;
}

@ -1,7 +1,7 @@
/*
* reporting.c
*
* Copyright 2013-2020 Michael Zillgith
* Copyright 2013-2021 Michael Zillgith
*
* This file is part of libIEC61850.
*
@ -2651,20 +2651,25 @@ exit_function:
return;
} /* enqueuReport() */
#define SENT_REPORT_ENTRY_FAILED 0
#define SENT_REPORT_ENTRY_FINISHED 1
#define SENT_REPORT_ENTRY_MORE_FOLLOWS 2
static bool
sendNextReportEntrySegment(ReportControl* self)
{
if (self->clientConnection == NULL)
return false;
return SENT_REPORT_ENTRY_FAILED;
if (self->reportBuffer->nextToTransmit == NULL)
return SENT_REPORT_ENTRY_FINISHED;
bool sentSuccess = true;
bool isBuffered = self->buffered;
int maxMmsPduSize = MmsServerConnection_getMaxMmsPduSize(self->clientConnection);
if (self->reportBuffer->nextToTransmit == NULL) {
return false;
}
int estimatedSegmentSize = 19; /* maximum size of header information (header can have 13-19 byte) */
estimatedSegmentSize += 8; /* reserve space for more-segments-follow (3 byte) and sub-seq-num (3-5 byte) */
@ -3102,7 +3107,7 @@ sendNextReportEntrySegment(ReportControl* self)
if (DEBUG_IED_SERVER)
printf("IED_SERVER: internal error in report buffer\n");
return false;
return SENT_REPORT_ENTRY_FAILED;
}
int dataElementSize = 1 + lenSize + length;
@ -3174,12 +3179,17 @@ sendNextReportEntrySegment(ReportControl* self)
reportBuffer->size = bufPos;
MmsServerConnection_sendMessage(self->clientConnection, reportBuffer);
sentSuccess = MmsServerConnection_sendMessage(self->clientConnection, reportBuffer);
MmsServer_releaseTransmitBuffer(self->server->mmsServer);
IsoConnection_unlock(self->clientConnection->isoConnection);
if (sentSuccess == false) {
moreFollows = false;
goto exit_function;
}
if (moreFollows == false) {
/* reset sub sequence number */
segmented = false;
@ -3222,7 +3232,14 @@ sendNextReportEntrySegment(ReportControl* self)
exit_function:
self->segmented = segmented;
return moreFollows;
if (sentSuccess == false)
return SENT_REPORT_ENTRY_FAILED;
if (moreFollows)
return SENT_REPORT_ENTRY_MORE_FOLLOWS;
else
return SENT_REPORT_ENTRY_FINISHED;
}
static void
@ -3234,8 +3251,25 @@ sendNextReportEntry(ReportControl* self)
while (self->reportBuffer->nextToTransmit) {
messageCount++;
while (sendNextReportEntrySegment(self)) {
messageCount++;
bool sendNextEntrySegment = true;
int sendResult = SENT_REPORT_ENTRY_FAILED;
while (sendNextEntrySegment) {
sendResult = sendNextReportEntrySegment(self);
if (sendResult != SENT_REPORT_ENTRY_FAILED) {
messageCount++;
}
if (sendResult != SENT_REPORT_ENTRY_MORE_FOLLOWS)
sendNextEntrySegment = false;
}
if (sendResult == SENT_REPORT_ENTRY_FAILED) {
//printf("Failed to send report\n");
break;
}
if (messageCount > 100)

@ -55,6 +55,10 @@ typedef struct {
ByteBuffer* writeBuffer; /* buffer to store TPKT packet to send */
ByteBuffer* readBuffer; /* buffer to store received TPKT packet */
uint16_t packetSize; /* size of the packet currently received */
uint8_t* socketExtensionBuffer; /* buffer to store data when TCP socket is not accepting all data */
int socketExtensionBufferSize; /* maximum number of bytes to store in the extension buffer */
int socketExtensionBufferFill; /* number of bytes in the extension buffer (bytes to write) */
} CotpConnection;
typedef enum {

@ -98,7 +98,7 @@ IsoConnection_getSecurityToken(IsoConnection self);
* \param handlerMode specifies if this function is used in the context of the connection handling thread
* (handlerMode)
*/
LIB61850_INTERNAL void
LIB61850_INTERNAL bool
IsoConnection_sendMessage(IsoConnection self, ByteBuffer* message);
LIB61850_INTERNAL IsoServer

@ -51,7 +51,7 @@ MmsServerConnection_destroy(MmsServerConnection connection);
LIB61850_INTERNAL int
MmsServerConnection_getMaxMmsPduSize(MmsServerConnection self);
LIB61850_INTERNAL void
LIB61850_INTERNAL bool
MmsServerConnection_sendMessage(MmsServerConnection self, ByteBuffer* message);
LIB61850_INTERNAL bool

@ -183,16 +183,30 @@ sendBuffer(CotpConnection* self)
bool retVal = false;
do {
int sentBytes = writeToSocket(self, buffer, remainingSize);
int sentBytes = writeToSocket(self, buffer, remainingSize);
if (sentBytes == -1)
goto exit_function;
if (sentBytes == -1)
goto exit_function;
if (sentBytes != remainingSize) {
buffer += sentBytes;
remainingSize -= sentBytes;
/* write additional data to extension buffer */
if (self->socketExtensionBuffer) {
uint8_t* extBuf = self->socketExtensionBuffer;
int extCurrentPos = self->socketExtensionBufferFill;
int bytesNotSent = remainingSize - sentBytes;
} while (remainingSize > 0);
int i;
for (i = 0; i < bytesNotSent; i++) {
extBuf[i + extCurrentPos] = buffer[sentBytes + i];
}
self->socketExtensionBufferFill = extCurrentPos + bytesNotSent;
}
else {
goto exit_function;
}
}
retVal = true;
@ -202,6 +216,33 @@ exit_function:
return retVal;
}
static void
flushBuffer(CotpConnection* self)
{
if (self->socketExtensionBufferFill > 0) {
int sentBytes = writeToSocket(self, self->socketExtensionBuffer, self->socketExtensionBufferFill);
if (sentBytes > 0) {
if (sentBytes != self->socketExtensionBufferFill) {
int target = 0;
int i;
uint8_t* buf = self->socketExtensionBuffer;
for (i = sentBytes; i < self->socketExtensionBufferFill; i++) {
buf[target++] = buf[i];
}
self->socketExtensionBufferFill = self->socketExtensionBufferFill - sentBytes;
}
else {
self->socketExtensionBufferFill = 0;
}
}
}
}
CotpIndication
CotpConnection_sendDataMessage(CotpConnection* self, BufferChain payload)
{
@ -218,6 +259,21 @@ CotpConnection_sendDataMessage(CotpConnection* self, BufferChain payload)
fragments += 1;
}
/* calculate total size of fragmented message */
int totalSize = (fragments * (COTP_DATA_HEADER_SIZE + 4)) + payload->length;
/* try to flush extension buffer */
flushBuffer(self);
/* check if totalSize will fit in extension buffer */
if (self->socketExtensionBuffer) {
int freeExtBufSize = self->socketExtensionBufferSize - self->socketExtensionBufferFill;
if (freeExtBufSize < totalSize) {
return COTP_ERROR;
}
}
int currentBufPos = 0;
int currentLimit;
int lastUnit;
@ -479,6 +535,10 @@ CotpConnection_init(CotpConnection* self, Socket socket,
self->writeBuffer = writeBuffer;
self->readBuffer = readBuffer;
self->packetSize = 0;
self->socketExtensionBuffer = GLOBAL_MALLOC(CONFIG_MMS_MAXIMUM_PDU_SIZE + 120);
self->socketExtensionBufferSize = CONFIG_MMS_MAXIMUM_PDU_SIZE + 120;
self->socketExtensionBufferFill = 0;
}
int /* in byte */

@ -796,10 +796,10 @@ MmsServerConnection_getMaxMmsPduSize(MmsServerConnection self)
return self->maxPduSize;
}
void
bool
MmsServerConnection_sendMessage(MmsServerConnection self, ByteBuffer* message)
{
IsoConnection_sendMessage(self->isoConnection, message);
return IsoConnection_sendMessage(self->isoConnection, message);
}
#if (MMS_DYNAMIC_DATA_SETS == 1)

@ -635,9 +635,11 @@ IsoConnection_unlock(IsoConnection self)
#endif
}
void
bool
IsoConnection_sendMessage(IsoConnection self, ByteBuffer* message)
{
bool success = false;
if (self->state == ISO_CON_STATE_STOPPED) {
if (DEBUG_ISO_SERVER)
printf("DEBUG_ISO_SERVER: sendMessage: connection already stopped!\n");
@ -677,8 +679,11 @@ IsoConnection_sendMessage(IsoConnection self, ByteBuffer* message)
printf("ISO_SERVER: IsoConnection_sendMessage success!\n");
}
if (indication == COTP_OK)
success = true;
exit_error:
return;
return success;
}
void

Loading…
Cancel
Save