Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connection interrupted and resumed callbacks #26

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "AWSCRT"
uuid = "df31ea59-17a4-4ebd-9d69-4f45266dc2c7"
version = "0.3.3"
version = "0.4.0"

[deps]
CountDownLatches = "621fb831-fdad-4fff-93ac-1af7b7ed19e3"
Expand Down
12 changes: 8 additions & 4 deletions src/AWSMQTT.jl
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,16 @@ end

struct _OnConnectionInterruptedEvent
callback::Function
conn::MQTTConnection
error_code::Int
end

_dispatch_event(event::_OnConnectionInterruptedEvent) = Base.invokelatest(event.callback, event.error_code)
_dispatch_event(event::_OnConnectionInterruptedEvent) = Base.invokelatest(event.callback, event.conn, event.error_code)

mutable struct _OnConnectionInterruptedUserData # mutable so it is heap allocated and has a stable address
ch::Channel{Any}
callback::Function
conn::MQTTConnection
end

function _c_on_connection_interrupted(
Expand Down Expand Up @@ -232,16 +234,18 @@ end

struct _OnConnectionResumedEvent
callback::Function
conn::MQTTConnection
return_code::aws_mqtt_connect_return_code
session_present::Bool
end

_dispatch_event(event::_OnConnectionResumedEvent) =
Base.invokelatest(event.callback, event.return_code, event.session_present)
Base.invokelatest(event.callback, event.conn, event.return_code, event.session_present)

mutable struct _OnConnectionResumedUserData # mutable so it is heap allocated and has a stable address
ch::Channel{Any}
callback::Function
conn::MQTTConnection
end

function _c_on_connection_resumed(
Expand Down Expand Up @@ -404,7 +408,7 @@ function connect(

# this ud must persist until the connection is closed
on_connection_interrupted_ud, on_connection_interrupted_udp = if on_connection_interrupted !== nothing
ud = _OnConnectionInterruptedUserData(connection.events, on_connection_interrupted)
ud = _OnConnectionInterruptedUserData(connection.events, on_connection_interrupted, connection)
udp = Base.pointer_from_objref(ud)
lock(_C_IDS_LOCK) do
# TODO we leak these refs, they are never freed
Expand All @@ -418,7 +422,7 @@ function connect(

# this ud must persist until the connection is closed
on_connection_resumed_ud, on_connection_resumed_udp = if on_connection_resumed !== nothing
ud = _OnConnectionResumedUserData(connection.events, on_connection_resumed)
ud = _OnConnectionResumedUserData(connection.events, on_connection_resumed, connection)
udp = Base.pointer_from_objref(ud)
lock(_C_IDS_LOCK) do
# TODO we leak these refs, they are never freed
Expand Down
5 changes: 3 additions & 2 deletions test/mqtt_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ end
GC.gc(true)
start_bytes = Base.gc_live_bytes()
start_nids = length(AWSCRT._C_IDS)
for _ = 1:1000
n_msgs = 1000
for _ = 1:n_msgs
task, id = publish(connection, topic1, payload1, AWS_MQTT_QOS_AT_LEAST_ONCE)
@test fetch(task) == Dict(:packet_id => id)

Expand All @@ -280,7 +281,7 @@ end
end_bytes = Base.gc_live_bytes()
end_nids = length(AWSCRT._C_IDS)
@show start_bytes end_bytes start_nids end_nids
@test end_bytes start_bytes rtol = 0.01
@test ((end_bytes - start_bytes) / n_msgs) < 500 # TODO ideally we are not leaking, but 1.9 is doing something weird. will drop support when 1.10 is officially the new LTS
@test start_nids == end_nids

fetch(unsubscribe(connection, topic1)[1])
Expand Down
Loading