Am 03.06.22, 17:08, schrieb Stipe Tolj:
Hi all,
in certain conditions it may be feasible also to have MO throughput
control in terms of regulating how many messages we want to handle on
the MO side (including DLRs).
For this purpose I have prepared the attached patchset, that introduces
the optional splitting of the config directives:
group = smsc
throughput = X
would means X as MT and MO throughput constraint, and
group = smsc
throughput-mt = X
throughput-mo = Y
where MT wise we have a X TPS constraint and MO wise a Y TPS.
So far only the SMSC HTTP and SMSC SMPP modules have been implementing
in the patchset the MO throughput control. Other modules may follow if
required.
Please review, comments as always welcome. If no objections, will commit
next week.
(ONE argument that may be a decline reason is that the behavior WOULD
change with current setups, i.e. for SMPP connections this would imply
that we would also have MO throughput control for DLRs if the
'throughput = x' value is configured. A way to solve this is to keep
'throughput' for MT wise only, and introduce 'throughput-mo' as the new
one for MO control. Comments?)
Hi all,
we found two memory leaks in the patch, so please find attached a
revised revision of it.
Thanks,
Stipe
--
Best Regards,
Stipe Tolj
-------------------------------------------------------------------
Düsseldorf, NRW, Germany
Kannel Foundation tolj.org system architecture
http://www.kannel.org/ http://www.tolj.org/
[email protected] [email protected]
-------------------------------------------------------------------
Index: ChangeLog
===================================================================
--- ChangeLog (revision 5323)
+++ ChangeLog (working copy)
@@ -1,3 +1,13 @@
+2022-06-03 Stipe Tolj <stolj at kannel.org>
+ * doc/userguide/userguide.xml: document 'throughput[-mt|-mo]' seperate
values.
+ * gw/smscconn_p.h: use seperate values for MT/MO throughput configurations.
+ * gw/smscconn.c: handle config directives for seperate MT/MO throughput.
+ * gw/smsc/smsc_*.c: use throughput_mt for MT based throughput control.
+ * gw/smsc/smsc_[http|smpp].c: implement MO throughput control.
+ * gwlib/cfg.def: add 'throughput[-mt|-mo]' config directives.
+ This patchset introduces the concept of MO throughput control by own config
+ directives and implementations in SMSC types HTTP and SMPP.
+
2022-05-19 Stipe Tolj <stolj at kannel.org>
* doc/userguide/userguide.xml: add comment about default smsbox-route.
* gw/bb_boxc.c: allow setting a default 'group = smsbox-route' without
Index: doc/userguide/userguide.xml
===================================================================
--- doc/userguide/userguide.xml (revision 5324)
+++ doc/userguide/userguide.xml (working copy)
@@ -2807,13 +2807,30 @@
recommended. The name is case-insensitive.
</entry></row>
- <row><entry><literal>throughput</literal></entry>
+ <row><entry><literal>throughput (r)</literal></entry>
<entry><literal>float (messages/sec)</literal></entry>
<entry valign="bottom">
If SMSC requires that Kannel limits the number of messages per second,
- use this variable. This is considered as active throttling. (optional)
+ use this variable. This is considered as active throttling. It
constraints
+ both sides, MT and MO with the same value. (optional)
</entry></row>
+ <row><entry><literal>throughput-mt (r)</literal></entry>
+ <entry><literal>float (messages/sec)</literal></entry>
+ <entry valign="bottom">
+ If SMSC requires that Kannel limits the number of messages per second,
+ use this variable. This is considered as active throttling. It
constraints
+ only the MT side. (optional)
+ </entry></row>
+
+ <row><entry><literal>throughput-mo (r)</literal></entry>
+ <entry><literal>float (messages/sec)</literal></entry>
+ <entry valign="bottom">
+ If SMSC requires that Kannel limits the number of messages per second,
+ use this variable. This is considered as active throttling. It
constraints
+ only the MO side. (optional)
+ </entry></row>
+
<row><entry><literal>denied-smsc-id</literal></entry>
<entry><literal>id-list</literal></entry>
<entry valign="bottom">
Index: gw/smsc/smsc_at.c
===================================================================
--- gw/smsc/smsc_at.c (revision 5315)
+++ gw/smsc/smsc_at.c (working copy)
@@ -2215,9 +2215,9 @@
if (privdata->modem->enable_mms &&
gw_prioqueue_len(privdata->outgoing_queue) > 1)
at2_send_modem_command(privdata, "AT+CMMS=2", 0, 0);
- if (privdata->conn->throughput > 0 && load_get(privdata->load, 0) >=
privdata->conn->throughput) {
- debug("bb.sms.at2", 0, "AT2[%s]: throughput limit exceeded (load: %.02f,
throughput: %.02f)",
- octstr_get_cstr(privdata->conn->id), load_get(privdata->load, 0),
privdata->conn->throughput);
+ if (privdata->conn->throughput_mt > 0 && load_get(privdata->load, 0) >=
privdata->conn->throughput_mt) {
+ debug("bb.sms.at2", 0, "AT2[%s]: MT throughput limit exceeded (load:
%.02f, throughput: %.02f)",
+ octstr_get_cstr(privdata->conn->id), load_get(privdata->load, 0),
privdata->conn->throughput_mt);
} else {
if ((msg = gw_prioqueue_remove(privdata->outgoing_queue))) {
load_increase(privdata->load);
Index: gw/smsc/smsc_emi.c
===================================================================
--- gw/smsc/smsc_emi.c (revision 5315)
+++ gw/smsc/smsc_emi.c (working copy)
@@ -1008,8 +1008,8 @@
Msg *msg;
double delay = 0;
- if (conn->throughput > 0) {
- delay = 1.0 / conn->throughput;
+ if (conn->throughput_mt > 0) {
+ delay = 1.0 / conn->throughput_mt;
}
/* Send messages if there's room in the sending window */
@@ -1017,7 +1017,7 @@
(msg = gw_prioqueue_remove(PRIVDATA(conn)->outgoing_queue)) !=
NULL) {
int nexttrn = emi2_next_trn(conn);
- if (conn->throughput > 0)
+ if (conn->throughput_mt > 0)
gwthread_sleep(delay);
/* convert the generic Kannel message into an EMI type message */
Index: gw/smsc/smsc_fake.c
===================================================================
--- gw/smsc/smsc_fake.c (revision 5315)
+++ gw/smsc/smsc_fake.c (working copy)
@@ -268,8 +268,8 @@
Msg *msg;
double delay = 0;
- if (conn->throughput > 0) {
- delay = 1.0 / conn->throughput;
+ if (conn->throughput_mt > 0) {
+ delay = 1.0 / conn->throughput_mt;
}
while (1) {
@@ -335,7 +335,7 @@
}
/* obey throughput speed limit, if any */
- if (conn->throughput > 0) {
+ if (conn->throughput_mt > 0) {
gwthread_sleep(delay);
}
}
Index: gw/smsc/smsc_http.c
===================================================================
--- gw/smsc/smsc_http.c (revision 5315)
+++ gw/smsc/smsc_http.c (working copy)
@@ -165,7 +165,13 @@
HTTPClient *client;
Octstr *ip, *url, *body;
List *headers, *cgivars;
+ double delay = 0;
+ /* throughput delay, if configured */
+ if (conn->throughput_mo > 0) {
+ delay = 1.0 / conn->throughput_mo;
+ }
+
/* Make sure we log into our own log-file if defined */
log_thread_to(conn->log_idx);
@@ -189,13 +195,19 @@
debug("smsc.http", 0, "HTTP[%s]: Got request `%s'",
octstr_get_cstr(conn->id), octstr_get_cstr(url));
+ /* obey throughput speed limit, if any */
+ if (conn->throughput_mo > 0) {
+ gwthread_sleep(delay);
+ }
+
if (connect_denied(conndata->allow_ip, ip)) {
info(0, "HTTP[%s]: Connection `%s' tried from denied "
"host %s, ignored", octstr_get_cstr(conn->id),
octstr_get_cstr(url), octstr_get_cstr(ip));
http_close_client(client);
- } else
+ } else {
conndata->callbacks->receive_sms(conn, client, headers, body,
cgivars);
+ }
debug("smsc.http", 0, "HTTP[%s]: Destroying client information",
octstr_get_cstr(conn->id));
@@ -249,8 +261,8 @@
/* Make sure we log into our own log-file if defined */
log_thread_to(conn->log_idx);
- if (conn->throughput) {
- delay = 1.0 / conn->throughput;
+ if (conn->throughput_mt) {
+ delay = 1.0 / conn->throughput_mt;
}
while (conndata->shutdown == 0) {
@@ -269,7 +281,7 @@
break;
/* obey throughput speed limit, if any */
- if (conn->throughput > 0) {
+ if (conn->throughput_mt > 0) {
gwthread_sleep(delay);
}
counter_increase(conndata->open_sends);
Index: gw/smsc/smsc_smasi.c
===================================================================
--- gw/smsc/smsc_smasi.c (revision 5315)
+++ gw/smsc/smsc_smasi.c (working copy)
@@ -872,8 +872,8 @@
if (*pending_submits == -1) return;
- if (smasi->conn->throughput > 0) {
- delay = 1.0 / smasi->conn->throughput;
+ if (smasi->conn->throughput_mt > 0) {
+ delay = 1.0 / smasi->conn->throughput_mt;
}
while (*pending_submits < MAX_PENDING_SUBMITS) {
@@ -894,7 +894,7 @@
smasi_pdu_destroy(pdu);
/* obey throughput speed limit, if any */
- if (smasi->conn->throughput > 0)
+ if (smasi->conn->throughput_mt > 0)
gwthread_sleep(delay);
++(*pending_submits);
Index: gw/smsc/smsc_smpp.c
===================================================================
--- gw/smsc/smsc_smpp.c (revision 5315)
+++ gw/smsc/smsc_smpp.c (working copy)
@@ -184,7 +184,8 @@
int wait_ack_action;
int esm_class;
long log_format;
- Load *load;
+ Load *load_mt;
+ Load *load_mo;
SMSCConn *conn;
} SMPP;
@@ -284,8 +285,10 @@
smpp->bind_addr_npi = 0;
smpp->use_ssl = 0;
smpp->ssl_client_certkey_file = NULL;
- smpp->load = load_create_real(0);
- load_add_interval(smpp->load, 1);
+ smpp->load_mt = load_create_real(0);
+ load_add_interval(smpp->load_mt, 1);
+ smpp->load_mo = load_create_real(0);
+ load_add_interval(smpp->load_mo, 1);
smpp->esm_class = esm_class;
return smpp;
@@ -309,7 +312,8 @@
octstr_destroy(smpp->alt_charset);
octstr_destroy(smpp->alt_addr_charset);
octstr_destroy(smpp->ssl_client_certkey_file);
- load_destroy(smpp->load);
+ load_destroy(smpp->load_mt);
+ load_destroy(smpp->load_mo);
gw_free(smpp);
}
}
@@ -1264,13 +1268,13 @@
while (*pending_submits < smpp->max_pending_submits) {
/* check our throughput */
- if (smpp->conn->throughput > 0 && load_get(smpp->load, 0) >=
smpp->conn->throughput) {
- debug("bb.sms.smpp", 0, "SMPP[%s]: throughput limit exceeded
(%.02f,%.02f)",
- octstr_get_cstr(smpp->conn->id), load_get(smpp->load, 0),
smpp->conn->throughput);
+ if (smpp->conn->throughput_mt > 0 && load_get(smpp->load_mt, 0) >=
smpp->conn->throughput_mt) {
+ debug("bb.sms.smpp", 0, "SMPP[%s]: MT throughput limit exceeded
(%.02f,%.02f)",
+ octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mt, 0),
smpp->conn->throughput_mt);
break;
}
- debug("bb.sms.smpp", 0, "SMPP[%s]: throughput (%.02f,%.02f)",
- octstr_get_cstr(smpp->conn->id), load_get(smpp->load, 0),
smpp->conn->throughput);
+ debug("bb.sms.smpp", 0, "SMPP[%s]: MT throughput (%.02f,%.02f)",
+ octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mt, 0),
smpp->conn->throughput_mt);
/* Get next message, quit if none to be sent */
msg = gw_prioqueue_remove(smpp->msgs_to_send);
@@ -1291,7 +1295,7 @@
smpp_pdu_destroy(pdu);
octstr_destroy(os);
++(*pending_submits);
- load_increase(smpp->load);
+ load_increase(smpp->load_mt);
}
else { /* write error occurs */
smpp_pdu_destroy(pdu);
@@ -1752,6 +1756,7 @@
return 0;
}
resp = smpp_pdu_create(data_sm_resp,
pdu->u.data_sm.sequence_number);
+
/*
* If SMSCConn stopped then send temp. error code
*/
@@ -1762,6 +1767,18 @@
break;
}
mutex_unlock(smpp->conn->flow_mutex);
+
+ /* check our throughput */
+ if (smpp->conn->throughput_mo > 0 && load_get(smpp->load_mo, 0) >=
smpp->conn->throughput_mo) {
+ debug("bb.sms.smpp", 0, "SMPP[%s]: MO throughput limit
exceeded (%.02f,%.02f)",
+ octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mo,
0), smpp->conn->throughput_mo);
+ resp->u.data_sm_resp.command_status = SMPP_ESME_RX_T_APPN;
+ break;
+ }
+ debug("bb.sms.smpp", 0, "SMPP[%s]: MO throughput (%.02f,%.02f)",
+ octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mo, 0),
smpp->conn->throughput_mo);
+ load_increase(smpp->load_mo);
+
/* got a deliver ack (DLR)?
* NOTE: following SMPP v3.4. spec. we are interested
* only on bits 2-5 (some SMSC's send 0x44, and it's
@@ -1818,6 +1835,7 @@
octstr_get_cstr(smpp->conn->id), pdu->type_name);
return 0;
}
+ resp = smpp_pdu_create(deliver_sm_resp,
pdu->u.deliver_sm.sequence_number);
/*
* If SMSCConn stopped then send temp. error code
@@ -1825,13 +1843,22 @@
mutex_lock(smpp->conn->flow_mutex);
if (smpp->conn->is_stopped) {
mutex_unlock(smpp->conn->flow_mutex);
- resp = smpp_pdu_create(deliver_sm_resp,
- pdu->u.deliver_sm.sequence_number);
resp->u.deliver_sm_resp.command_status = SMPP_ESME_RX_T_APPN;
break;
}
mutex_unlock(smpp->conn->flow_mutex);
+ /* check our throughput */
+ if (smpp->conn->throughput_mo > 0 && load_get(smpp->load_mo, 0) >=
smpp->conn->throughput_mo) {
+ debug("bb.sms.smpp", 0, "SMPP[%s]: MO throughput limit
exceeded (%.02f,%.02f)",
+ octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mo,
0), smpp->conn->throughput_mo);
+ resp->u.deliver_sm_resp.command_status = SMPP_ESME_RX_T_APPN;
+ break;
+ }
+ debug("bb.sms.smpp", 0, "SMPP[%s]: MO throughput (%.02f,%.02f)",
+ octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mo, 0),
smpp->conn->throughput_mo);
+ load_increase(smpp->load_mo);
+
/*
* Got a deliver ack (DLR)?
* NOTE: following SMPP v3.4. spec. we are interested
@@ -1845,7 +1872,6 @@
dlrmsg = handle_dlr(smpp, pdu->u.deliver_sm.source_addr,
pdu->u.deliver_sm.short_message, pdu->u.deliver_sm.message_payload,
pdu->u.deliver_sm.receipted_message_id,
pdu->u.deliver_sm.message_state, pdu->u.deliver_sm.network_error_code);
- resp = smpp_pdu_create(deliver_sm_resp,
pdu->u.deliver_sm.sequence_number);
if (dlrmsg != NULL) {
if (dlrmsg->sms.meta_data == NULL)
dlrmsg->sms.meta_data = octstr_create("");
@@ -1865,8 +1891,7 @@
}
resp->u.deliver_sm_resp.command_status =
smscconn_failure_reason_to_smpp_status(reason);
} else {/* MO-SMS */
- resp = smpp_pdu_create(deliver_sm_resp,
- pdu->u.deliver_sm.sequence_number);
+
/* ensure the smsc-id is set */
msg = pdu_to_msg(smpp, pdu, &reason);
if (msg == NULL) {
@@ -2410,9 +2435,9 @@
smpp->throttling_err_time > 0 && pending_submits <
smpp->max_pending_submits) {
time_t tr_timeout = smpp->throttling_err_time +
SMPP_THROTTLING_SLEEP_TIME - now;
timeout = timeout > tr_timeout ? tr_timeout : timeout;
- } else if (transmitter && gw_prioqueue_len(smpp->msgs_to_send)
> 0 && smpp->conn->throughput > 0 &&
+ } else if (transmitter && gw_prioqueue_len(smpp->msgs_to_send)
> 0 && smpp->conn->throughput_mt > 0 &&
smpp->max_pending_submits > pending_submits) {
- double t = 1.0 / smpp->conn->throughput;
+ double t = 1.0 / smpp->conn->throughput_mt;
timeout = t < timeout ? t : timeout;
}
/* sleep a while */
Index: gw/smscconn.c
===================================================================
--- gw/smscconn.c (revision 5315)
+++ gw/smscconn.c (working copy)
@@ -336,11 +336,28 @@
panic(0, "Could not compile pattern '%s'",
octstr_get_cstr(preferred_prefix_regex));
if ((tmp = cfg_get(grp, octstr_imm("throughput"))) != NULL) {
- if (octstr_parse_double(&conn->throughput, tmp, 0) == -1)
- conn->throughput = 0;
+ if (octstr_parse_double(&conn->throughput_mt, tmp, 0) == -1) {
+ conn->throughput_mt = conn->throughput_mo = 0;
+ } else {
+ conn->throughput_mo = conn->throughput_mt;
+ }
octstr_destroy(tmp);
- info(0, "Set throughput to %.3f for smsc id <%s>", conn->throughput,
octstr_get_cstr(conn->id));
+ info(0, "Set MT/MO throughput to %.3f for smsc id <%s>",
conn->throughput_mt, octstr_get_cstr(conn->id));
}
+ else {
+ if ((tmp = cfg_get(grp, octstr_imm("throughput-mt"))) != NULL) {
+ if (octstr_parse_double(&conn->throughput_mt, tmp, 0) == -1)
+ conn->throughput_mt = 0;
+ octstr_destroy(tmp);
+ info(0, "Set MT throughput to %.3f for smsc id <%s>",
conn->throughput_mt, octstr_get_cstr(conn->id));
+ }
+ if ((tmp = cfg_get(grp, octstr_imm("throughput-mo"))) != NULL) {
+ if (octstr_parse_double(&conn->throughput_mo, tmp, 0) == -1)
+ conn->throughput_mo = 0;
+ octstr_destroy(tmp);
+ info(0, "Set MO throughput to %.3f for smsc id <%s>",
conn->throughput_mo, octstr_get_cstr(conn->id));
+ }
+ }
/* Sets the admin_id. Equals to connection id if empty */
GET_OPTIONAL_VAL(conn->admin_id, "smsc-admin-id");
if (conn->admin_id == NULL)
Index: gw/smscconn_p.h
===================================================================
--- gw/smscconn_p.h (revision 5315)
+++ gw/smscconn_p.h (working copy)
@@ -200,7 +200,8 @@
int alt_dcs; /* use alternate DCS 0xFX */
- double throughput; /* message thoughput per sec. to be delivered to
SMSC */
+ double throughput_mt; /* message throughput per sec. to be delivered to
SMSC */
+ double throughput_mo; /* message throughput per sec. to be receiver from
SMSC */
/* Stores rerouting information for this specific smsc-id */
int reroute; /* simply turn MO into MT and process
internally */
Index: gwlib/cfg.def
===================================================================
--- gwlib/cfg.def (revision 5315)
+++ gwlib/cfg.def (working copy)
@@ -330,6 +330,8 @@
OCTSTR(our-host)
OCTSTR(alt-dcs)
OCTSTR(throughput)
+ OCTSTR(throughput-mt)
+ OCTSTR(throughput-mo)
OCTSTR(dead-start)
OCTSTR(alt-charset)
OCTSTR(host)