ADD: support for subscribing to topics on the ntfy server
CLOSES: #5 #8 #10 TIME: 3h
This commit is contained in:
parent
9f00721e49
commit
fc04cea1b5
@ -106,10 +106,12 @@ sub NTFY_CLIENT_Initialize
|
|||||||
{
|
{
|
||||||
my ($hash) = @_;
|
my ($hash) = @_;
|
||||||
|
|
||||||
|
$hash->{Match} = "^NTFY:.*";
|
||||||
$hash->{DefFn} = 'NTFY_Define';
|
$hash->{DefFn} = 'NTFY_Define';
|
||||||
$hash->{SetFn} = 'NTFY_Set';
|
$hash->{SetFn} = 'NTFY_Set';
|
||||||
$hash->{ReadFn} = 'NTFY_Read';
|
$hash->{ReadFn} = 'NTFY_Read';
|
||||||
$hash->{AttrFn} = 'NTFY_Attr';
|
$hash->{AttrFn} = 'NTFY_Attr';
|
||||||
|
$hash->{ParseFn} = 'NTFY_Parse';
|
||||||
$hash->{AttrList} = "defaultTopic "
|
$hash->{AttrList} = "defaultTopic "
|
||||||
. $readingFnAttributes;
|
. $readingFnAttributes;
|
||||||
|
|
||||||
@ -137,9 +139,7 @@ sub NTFY_Define
|
|||||||
$hash->{STATE} = "unknown";
|
$hash->{STATE} = "unknown";
|
||||||
$hash->{USERNAME} = $h->{user} || "";
|
$hash->{USERNAME} = $h->{user} || "";
|
||||||
$hash->{helper}{PASSWORD} = $h->{password};
|
$hash->{helper}{PASSWORD} = $h->{password};
|
||||||
|
$modules{NTFY_CLIENT}{defptr}{$hash->{SERVER}} = $hash;
|
||||||
my @topics;
|
|
||||||
$hash->{helper}->{topics} = \@topics;
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -147,15 +147,15 @@ sub NTFY_Define
|
|||||||
sub NTFY_Update_Subscriptions_Readings
|
sub NTFY_Update_Subscriptions_Readings
|
||||||
{
|
{
|
||||||
my $hash = shift;
|
my $hash = shift;
|
||||||
|
my @topics;
|
||||||
|
|
||||||
my $subscriptions="";
|
for my $k (keys %{$modules{NTFY_TOPIC}{defptr}})
|
||||||
for my $thash (@{$hash->{helper}->{subscriptions}})
|
|
||||||
{
|
{
|
||||||
$subscriptions .= $hash->{TOPIC} . ",";
|
$k=~/^(.*)_(.*)$/;
|
||||||
|
push(@topics,$2);
|
||||||
}
|
}
|
||||||
chop $subscriptions;
|
|
||||||
|
readingsSingleUpdate($hash,"subscriptions", join(",", @topics),1);
|
||||||
readingsSingleUpdate($hash,"subscriptions", $subscriptions,1);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,131 +164,14 @@ sub NTFY_newSubscription
|
|||||||
my $hash = shift;
|
my $hash = shift;
|
||||||
my $topic = shift;
|
my $topic = shift;
|
||||||
|
|
||||||
my $thash = {};
|
my $newDeviceName = makeDeviceName($hash->{NAME} . "_" . $topic);
|
||||||
|
my $token = NFTY_Calc_Auth_Token($hash->{helper}->{PASSWORD},$hash->{USERNAME});
|
||||||
$thash->{NAME} = makeDeviceName("NTFY_TOPIC_" . $topic);
|
|
||||||
|
|
||||||
$thash->{TYPE} = $hash->{TYPE};
|
fhem("define $newDeviceName NTFY_TOPIC " . $hash->{SERVER} . " " . $token . " " . $topic);
|
||||||
$thash->{NR} = $devcount++;
|
|
||||||
|
|
||||||
$thash->{phash} = $hash;
|
|
||||||
$thash->{PNAME} = $hash->{NAME};
|
|
||||||
|
|
||||||
$thash->{TOPIC} = $topic;
|
|
||||||
$thash->{SERVER} = $hash->{SERVER};
|
|
||||||
$thash->{USERNAME} = $hash->{USERNAME} || "";
|
|
||||||
$thash->{helper}->{PASSWORD} = $hash->{helper}->{PASSWORD};
|
|
||||||
|
|
||||||
$thash->{TEMPORARY} = 1;
|
|
||||||
|
|
||||||
my $useSSL = 0;
|
|
||||||
if ($thash->{SERVER}=~/https/)
|
|
||||||
{
|
|
||||||
$useSSL = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
my $port = $useSSL == 1 ? 443 : 80;
|
|
||||||
if ($thash->{SERVER}=~/:(\d+)/)
|
|
||||||
{
|
|
||||||
$port = $1;
|
|
||||||
}
|
|
||||||
|
|
||||||
my $dev = $thash->{SERVER} . ":" . $port . "/" . $thash->{TOPIC} . "/ws";
|
|
||||||
|
|
||||||
if ($thash->{helper}->{PASSWORD} && length($thash->{helper}->{PASSWORD}) > 0)
|
|
||||||
{
|
|
||||||
my $token = NFTY_Calc_Auth_Token($thash->{helper}->{PASSWORD},$thash->{USERNAME});
|
|
||||||
if (!$token)
|
|
||||||
{
|
|
||||||
NTFY_LOG(LOG_ERROR,"Can not suscribe to topic without valid token");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$dev .="?auth=" . $token;
|
|
||||||
}
|
|
||||||
|
|
||||||
# swap http(s) to expected ws(s)
|
|
||||||
$dev =~ s/^.*:\/\//wss:/;
|
|
||||||
|
|
||||||
# just for debugging purposes
|
|
||||||
NTFY_LOG(LOG_DEBUG,"using websocket url: " . $dev);
|
|
||||||
|
|
||||||
$thash->{DeviceName}=$dev;
|
|
||||||
$thash->{WEBSOCKET} = 1;
|
|
||||||
|
|
||||||
$attr{$thash->{NAME}}{room} = 'hidden';
|
|
||||||
$defs{$thash->{NAME}} = $thash;
|
|
||||||
|
|
||||||
DevIo_OpenDev( $thash, 0, "NTFY_WS_Handshake", "NTFY_WS_CB" );
|
|
||||||
|
|
||||||
# remember topic in main hash helper
|
|
||||||
push(@{$hash->{helper}->{topics}},$thash);
|
|
||||||
|
|
||||||
NTFY_Update_Subscriptions_Readings($hash);
|
NTFY_Update_Subscriptions_Readings($hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
sub NTFY_Topic_To_Hash
|
|
||||||
{
|
|
||||||
my $hash = shift;
|
|
||||||
my $topic = shift;
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
sub NTFY_WS_Handshake
|
|
||||||
{
|
|
||||||
my $hash = shift;
|
|
||||||
my $name = $hash->{NAME};
|
|
||||||
|
|
||||||
DevIo_SimpleWrite( $hash, '', 2 );
|
|
||||||
NTFY_LOG(LOG_DEBUG, "websocket connected");
|
|
||||||
|
|
||||||
readingsSingleUpdate($hash, "state", "online", 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
sub NTFY_WS_CB
|
|
||||||
{
|
|
||||||
my ($hash, $error) = @_;
|
|
||||||
|
|
||||||
my $name = $hash->{NAME};
|
|
||||||
|
|
||||||
if ($error)
|
|
||||||
{
|
|
||||||
readingsBeginUpdate($hash);
|
|
||||||
readingsBulkUpdate($hash, "state", "error");
|
|
||||||
readingsBulkUpdate($hash, "error", $error);
|
|
||||||
readingsEndUpdate($hash,1);
|
|
||||||
}
|
|
||||||
|
|
||||||
NTFY_LOG(LOG_ERROR, "error while connecting to websocket: $error ") if $error;
|
|
||||||
NTFY_LOG(LOG_DEBUG, "websocket callback called");
|
|
||||||
return $error;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
sub NTFY_Read
|
|
||||||
{
|
|
||||||
my ( $hash ) = @_;
|
|
||||||
|
|
||||||
my $buf = DevIo_SimpleRead($hash);
|
|
||||||
|
|
||||||
return unless length($buf) > 0;
|
|
||||||
|
|
||||||
my $msg = from_json($buf);
|
|
||||||
|
|
||||||
return unless $msg->{event} eq "message";
|
|
||||||
|
|
||||||
my $nrReceivedMessages = ReadingsVal($hash->{phash},"nrReceivedMessages",0);
|
|
||||||
+
|
|
||||||
$nrReceivedMessages++;
|
|
||||||
readingsBeginUpdate($hash->{phash});
|
|
||||||
readingsBulkUpdateIfChanged($hash->{phash},"nrReceivedMessages",$nrReceivedMessages++);
|
|
||||||
readingsBulkUpdateIfChanged($hash->{phash}, "lastReceivedTopic", $msg->{topic});
|
|
||||||
readingsEndUpdate($hash->{phash},1);
|
|
||||||
|
|
||||||
NTFY_LOG(LOG_DEBUG, $hash->{NAME} . " received: " . $buf);
|
|
||||||
|
|
||||||
}
|
|
||||||
sub NTFY_Publish_Msg
|
sub NTFY_Publish_Msg
|
||||||
{
|
{
|
||||||
my $hash = shift;
|
my $hash = shift;
|
||||||
@ -458,9 +341,14 @@ sub NTFY_Set
|
|||||||
|
|
||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
|
elsif ($cmd eq "subscribe")
|
||||||
|
{
|
||||||
|
NTFY_LOG(LOG_DEBUG, "full command: " . join(' ', @args));
|
||||||
|
NTFY_newSubscription($hash, $args[0]);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
return "Unknown argument $cmd, choose one of publish"
|
return "Unknown argument $cmd, choose one of publish subscribe"
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -471,6 +359,55 @@ sub NTFY_Attr
|
|||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#
|
||||||
|
# Process the incoming notifications/ messages
|
||||||
|
#
|
||||||
|
sub NTFY_Process_Message
|
||||||
|
{
|
||||||
|
my $hash = shift;
|
||||||
|
my $msg = shift;
|
||||||
|
|
||||||
|
my $msgData = from_json($msg);
|
||||||
|
|
||||||
|
return unless $msgData->{event} eq "message";
|
||||||
|
|
||||||
|
my $nrReceivedMessages = ReadingsVal($hash->{NAME}, "nrReceivedMessages", 0);
|
||||||
|
$nrReceivedMessages++;
|
||||||
|
|
||||||
|
readingsBeginUpdate($hash);
|
||||||
|
readingsBulkUpdateIfChanged($hash,"nrReceivedMessages",$nrReceivedMessages);
|
||||||
|
readingsBulkUpdateIfChanged($hash,"lastReceivedTopic",$msgData->{topic}) unless !$msgData->{topic};
|
||||||
|
readingsBulkUpdateIfChanged($hash,"lastReceivedTitle",$msgData->{title}) unless !$msgData->{title};
|
||||||
|
readingsBulkUpdateIfChanged($hash,"lastReceivedData", $msgData->{message}) unless !$msgData->{message};
|
||||||
|
readingsBulkUpdateIfChanged($hash,"lastReceivedRawMessage", $msg);
|
||||||
|
readingsEndUpdate($hash,1);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#
|
||||||
|
# Parse incoming notifications from websocket clients
|
||||||
|
#
|
||||||
|
sub NTFY_Parse ($$)
|
||||||
|
{
|
||||||
|
my ( $ioHash, $message) = @_;
|
||||||
|
|
||||||
|
return unless (substr($message,0,5) eq "NTFY:");
|
||||||
|
|
||||||
|
$message=~/^NTFY:(.*)---(.*)$/;
|
||||||
|
|
||||||
|
my $server = $1;
|
||||||
|
my $msg = $2;
|
||||||
|
|
||||||
|
if(my $hash = $modules{NTFY_CLIENT}{defptr}{$server})
|
||||||
|
{
|
||||||
|
NTFY_Update_Subscriptions_Readings($hash);
|
||||||
|
NTFY_Process_Message($hash, $msg);
|
||||||
|
return $hash->{NAME};
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
1;
|
1;
|
||||||
|
|
||||||
@ -537,6 +474,12 @@ sub NTFY_Attr
|
|||||||
</ul>
|
</ul>
|
||||||
Everything without a prefix is considered the messages and is concatenated.
|
Everything without a prefix is considered the messages and is concatenated.
|
||||||
</dd>
|
</dd>
|
||||||
|
<dt><b>subscribe</b></dt>
|
||||||
|
<dd>
|
||||||
|
This set command subscribes to the specified topic on the configured ntfy server.
|
||||||
|
For this the client adds a NTFY_TOPIC device which is responsible for the websocket
|
||||||
|
management. Message processing is done in NTFY_CLIENT.
|
||||||
|
</dd>
|
||||||
</dl>
|
</dl>
|
||||||
|
|
||||||
<a id="NTFY_CLIENT-get"></a>
|
<a id="NTFY_CLIENT-get"></a>
|
||||||
|
216
FHEM/98_NTFY_TOPIC.pm
Normal file
216
FHEM/98_NTFY_TOPIC.pm
Normal file
@ -0,0 +1,216 @@
|
|||||||
|
=head1 NAME
|
||||||
|
|
||||||
|
NTFY_TOPIC - "physical" client for ntfy.sh based servers to receive notifications from topics
|
||||||
|
|
||||||
|
=head1 LICENSE AND COPYRIGHT
|
||||||
|
|
||||||
|
Copyright (C) 2024 by Dominik Meyer
|
||||||
|
|
||||||
|
This program is free software: you can redistribute it and/or modify it under the terms of the
|
||||||
|
GNU General Public License as published by the Free Software Foundation, either version 3 of the
|
||||||
|
License, or (at your option) any later version.
|
||||||
|
|
||||||
|
This module is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even
|
||||||
|
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
See the GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License along with this program.
|
||||||
|
If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
=head1 DESCRIPTION
|
||||||
|
|
||||||
|
This module is the "physical" device to connect a topic on a ntfy compatible server with the NTFY_CLIENT.
|
||||||
|
|
||||||
|
=head1 AUTHORS
|
||||||
|
|
||||||
|
Dominik Meyer <dmeyer@federationhq.de>
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
package main;
|
||||||
|
|
||||||
|
# enforce strict and warnings mode
|
||||||
|
use strict;
|
||||||
|
use warnings;
|
||||||
|
|
||||||
|
# required for sending and receiving data from ntfy.sh
|
||||||
|
use LWP::UserAgent;
|
||||||
|
use HTTP::Request;
|
||||||
|
use URI;
|
||||||
|
use JSON;
|
||||||
|
use Text::ParseWords;
|
||||||
|
use HttpUtils;
|
||||||
|
use FHEM::Text::Unicode qw(:ALL);
|
||||||
|
use Data::Dumper;
|
||||||
|
|
||||||
|
# some module wide constansts
|
||||||
|
my $MODULE_NAME="NTFY-TOPIC";
|
||||||
|
my $VERSION = '0.0.1';
|
||||||
|
|
||||||
|
|
||||||
|
use constant {
|
||||||
|
LOG_CRITICAL => 0,
|
||||||
|
LOG_ERROR => 1,
|
||||||
|
LOG_WARNING => 2,
|
||||||
|
LOG_SEND => 3,
|
||||||
|
LOG_RECEIVE => 4,
|
||||||
|
LOG_DEBUG => 5,
|
||||||
|
|
||||||
|
PRIO_DEFAULT => 3,
|
||||||
|
PRIO_HIGH => 5,
|
||||||
|
PRIO_LOW => 2
|
||||||
|
};
|
||||||
|
|
||||||
|
# NTFY logging method
|
||||||
|
sub NTFY_TOPIC_LOG
|
||||||
|
{
|
||||||
|
my $verbosity = shift;
|
||||||
|
my $msg = shift;
|
||||||
|
|
||||||
|
Log3 $MODULE_NAME, $verbosity, $MODULE_NAME . ":" . $msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
# initialize the NTFY Topic Module
|
||||||
|
sub NTFY_TOPIC_Initialize
|
||||||
|
{
|
||||||
|
my ($hash) = @_;
|
||||||
|
|
||||||
|
$hash->{DefFn} = 'NTFY_TOPIC_Define';
|
||||||
|
$hash->{ReadFn} = 'NTFY_TOPIC_Read';
|
||||||
|
$hash->{WriteFn} = 'NTFY_TOPIC_Write';
|
||||||
|
$hash->{AttrFn} = 'NTFY_TOPIC_Attr';
|
||||||
|
$hash->{AttrList} = $readingFnAttributes;
|
||||||
|
|
||||||
|
$hash->{MatchList} = {"1:NTFY_CLIENT" => "^NTFY:.*"};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub NTFY_TOPIC_Define
|
||||||
|
{
|
||||||
|
my ($hash, $define) = @_;
|
||||||
|
|
||||||
|
# ensure we have something to parse
|
||||||
|
if (!$define)
|
||||||
|
{
|
||||||
|
warn("$MODULE_NAME: no module definition provided");
|
||||||
|
NTFY_TOPIC_LOG(LOG_ERROR,"no module definition provided");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
# parse parameters into array and hash
|
||||||
|
my($params, $h) = parseParams($define);
|
||||||
|
|
||||||
|
my $name = makeDeviceName($params->[0]);
|
||||||
|
|
||||||
|
$hash->{NAME} = $name;
|
||||||
|
$hash->{SERVER} = $params->[2];
|
||||||
|
$hash->{helper}{authString} = $params->[3];
|
||||||
|
$hash->{TOPIC} = $params->[4];
|
||||||
|
$hash->{Clients} = "NTFY_CLIENT";
|
||||||
|
$hash->{ClientsKeepOrder} = 1;
|
||||||
|
$hash->{STATE} = "unknown";
|
||||||
|
$modules{NTFY_TOPIC}{defptr}{$hash->{SERVER} . "_" . $hash->{TOPIC}} = $hash;
|
||||||
|
|
||||||
|
$attr{$hash->{NAME}}{room} = 'hidden';
|
||||||
|
|
||||||
|
my $useSSL = 0;
|
||||||
|
if ($hash->{SERVER}=~/https/)
|
||||||
|
{
|
||||||
|
$useSSL = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
my $port = $useSSL == 1 ? 443 : 80;
|
||||||
|
if ($hash->{SERVER}=~/:(\d+)/)
|
||||||
|
{
|
||||||
|
$hash->{PORT} = $1;
|
||||||
|
}
|
||||||
|
|
||||||
|
my $dev = $hash->{SERVER} . ":" . $port . "/" . $hash->{TOPIC} . "/ws";
|
||||||
|
if ($hash->{helper}{authString} && length($hash->{helper}{authString})>0)
|
||||||
|
{
|
||||||
|
$dev .="?auth=" . $hash->{helper}{authString};
|
||||||
|
}
|
||||||
|
|
||||||
|
# swap http(s) to expected ws(s)
|
||||||
|
$dev =~ s/^.*:\/\//wss:/;
|
||||||
|
# just for debugging purposes
|
||||||
|
NTFY_TOPIC_LOG(LOG_DEBUG,"using websocket url: " . $dev);
|
||||||
|
|
||||||
|
$hash->{DeviceName}=$dev;
|
||||||
|
$hash->{WEBSOCKET} = 1;
|
||||||
|
|
||||||
|
DevIo_OpenDev( $hash, 0, "NTFY_WS_Handshake", "NTFY_WS_CB" );
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub NTFY_TOPIC_Attr
|
||||||
|
{
|
||||||
|
my ( $cmd, $name, $aName, $aValue ) = @_;
|
||||||
|
return undef;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub NTFY_WS_Handshake
|
||||||
|
{
|
||||||
|
my $hash = shift;
|
||||||
|
my $name = $hash->{NAME};
|
||||||
|
|
||||||
|
#DevIo_SimpleWrite( $hash, ' ', 2 );
|
||||||
|
NTFY_TOPIC_LOG(LOG_DEBUG, "websocket connected");
|
||||||
|
|
||||||
|
readingsSingleUpdate($hash, "state", "connected", 1);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub NTFY_WS_CB
|
||||||
|
{
|
||||||
|
my ($hash, $error) = @_;
|
||||||
|
|
||||||
|
my $name = $hash->{NAME};
|
||||||
|
|
||||||
|
if ($error)
|
||||||
|
{
|
||||||
|
readingsBeginUpdate($hash);
|
||||||
|
readingsBulkUpdate($hash, "state", "error");
|
||||||
|
readingsBulkUpdate($hash, "error", $error);
|
||||||
|
readingsEndUpdate($hash,1);
|
||||||
|
|
||||||
|
NTFY_TOPIC_LOG(LOG_ERROR, "error while connecting to websocket: $error ") if $error;
|
||||||
|
NTFY_TOPIC_LOG(LOG_DEBUG, "websocket callback called");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
readingsBeginUpdate($hash);
|
||||||
|
readingsBulkUpdate($hash, "state", "online");
|
||||||
|
readingsBulkUpdate($hash, "error", "none");
|
||||||
|
readingsEndUpdate($hash,1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return $error;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub NTFY_TOPIC_Read
|
||||||
|
{
|
||||||
|
my ( $hash ) = @_;
|
||||||
|
my $buf = DevIo_SimpleRead($hash);
|
||||||
|
|
||||||
|
return unless length($buf) > 0;
|
||||||
|
|
||||||
|
my $msg = from_json($buf);
|
||||||
|
|
||||||
|
return unless $msg->{event} eq "message";
|
||||||
|
|
||||||
|
my $nrReceivedMessages = ReadingsVal($hash->{NAME},"nrReceivedMessages",0);
|
||||||
|
|
||||||
|
$nrReceivedMessages++;
|
||||||
|
readingsBeginUpdate($hash);
|
||||||
|
readingsBulkUpdateIfChanged($hash,"nrReceivedMessages",$nrReceivedMessages);
|
||||||
|
readingsEndUpdate($hash,1);
|
||||||
|
|
||||||
|
Dispatch($hash,"NTFY:" . $hash->{SERVER}. "---" . $buf,{},1);
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
Loading…
Reference in New Issue
Block a user