diff --git a/FHEM/98_NTFY_CLIENT.pm b/FHEM/98_NTFY_CLIENT.pm index 3b36e1f..188fc2d 100644 --- a/FHEM/98_NTFY_CLIENT.pm +++ b/FHEM/98_NTFY_CLIENT.pm @@ -106,10 +106,12 @@ sub NTFY_CLIENT_Initialize { my ($hash) = @_; + $hash->{Match} = "^NTFY:.*"; $hash->{DefFn} = 'NTFY_Define'; $hash->{SetFn} = 'NTFY_Set'; $hash->{ReadFn} = 'NTFY_Read'; $hash->{AttrFn} = 'NTFY_Attr'; + $hash->{ParseFn} = 'NTFY_Parse'; $hash->{AttrList} = "defaultTopic " . $readingFnAttributes; @@ -137,9 +139,7 @@ sub NTFY_Define $hash->{STATE} = "unknown"; $hash->{USERNAME} = $h->{user} || ""; $hash->{helper}{PASSWORD} = $h->{password}; - - my @topics; - $hash->{helper}->{topics} = \@topics; + $modules{NTFY_CLIENT}{defptr}{$hash->{SERVER}} = $hash; return; } @@ -147,15 +147,15 @@ sub NTFY_Define sub NTFY_Update_Subscriptions_Readings { my $hash = shift; + my @topics; - my $subscriptions=""; - for my $thash (@{$hash->{helper}->{subscriptions}}) + for my $k (keys %{$modules{NTFY_TOPIC}{defptr}}) { - $subscriptions .= $hash->{TOPIC} . ","; + $k=~/^(.*)_(.*)$/; + push(@topics,$2); } - chop $subscriptions; - - readingsSingleUpdate($hash,"subscriptions", $subscriptions,1); + + readingsSingleUpdate($hash,"subscriptions", join(",", @topics),1); } @@ -164,131 +164,14 @@ sub NTFY_newSubscription my $hash = shift; my $topic = shift; - my $thash = {}; - - $thash->{NAME} = makeDeviceName("NTFY_TOPIC_" . $topic); + my $newDeviceName = makeDeviceName($hash->{NAME} . "_" . $topic); + my $token = NFTY_Calc_Auth_Token($hash->{helper}->{PASSWORD},$hash->{USERNAME}); - $thash->{TYPE} = $hash->{TYPE}; - $thash->{NR} = $devcount++; - - $thash->{phash} = $hash; - $thash->{PNAME} = $hash->{NAME}; - - $thash->{TOPIC} = $topic; - $thash->{SERVER} = $hash->{SERVER}; - $thash->{USERNAME} = $hash->{USERNAME} || ""; - $thash->{helper}->{PASSWORD} = $hash->{helper}->{PASSWORD}; - - $thash->{TEMPORARY} = 1; - - my $useSSL = 0; - if ($thash->{SERVER}=~/https/) - { - $useSSL = 1; - } - - my $port = $useSSL == 1 ? 443 : 80; - if ($thash->{SERVER}=~/:(\d+)/) - { - $port = $1; - } - - my $dev = $thash->{SERVER} . ":" . $port . "/" . $thash->{TOPIC} . "/ws"; - - if ($thash->{helper}->{PASSWORD} && length($thash->{helper}->{PASSWORD}) > 0) - { - my $token = NFTY_Calc_Auth_Token($thash->{helper}->{PASSWORD},$thash->{USERNAME}); - if (!$token) - { - NTFY_LOG(LOG_ERROR,"Can not suscribe to topic without valid token"); - return; - } - - $dev .="?auth=" . $token; - } - - # swap http(s) to expected ws(s) - $dev =~ s/^.*:\/\//wss:/; - - # just for debugging purposes - NTFY_LOG(LOG_DEBUG,"using websocket url: " . $dev); - - $thash->{DeviceName}=$dev; - $thash->{WEBSOCKET} = 1; - - $attr{$thash->{NAME}}{room} = 'hidden'; - $defs{$thash->{NAME}} = $thash; - - DevIo_OpenDev( $thash, 0, "NTFY_WS_Handshake", "NTFY_WS_CB" ); - - # remember topic in main hash helper - push(@{$hash->{helper}->{topics}},$thash); + fhem("define $newDeviceName NTFY_TOPIC " . $hash->{SERVER} . " " . $token . " " . $topic); NTFY_Update_Subscriptions_Readings($hash); } -sub NTFY_Topic_To_Hash -{ - my $hash = shift; - my $topic = shift; - - -} - -sub NTFY_WS_Handshake -{ - my $hash = shift; - my $name = $hash->{NAME}; - - DevIo_SimpleWrite( $hash, '', 2 ); - NTFY_LOG(LOG_DEBUG, "websocket connected"); - - readingsSingleUpdate($hash, "state", "online", 1); -} - -sub NTFY_WS_CB -{ - my ($hash, $error) = @_; - - my $name = $hash->{NAME}; - - if ($error) - { - readingsBeginUpdate($hash); - readingsBulkUpdate($hash, "state", "error"); - readingsBulkUpdate($hash, "error", $error); - readingsEndUpdate($hash,1); - } - - NTFY_LOG(LOG_ERROR, "error while connecting to websocket: $error ") if $error; - NTFY_LOG(LOG_DEBUG, "websocket callback called"); - return $error; - -} - -sub NTFY_Read -{ - my ( $hash ) = @_; - - my $buf = DevIo_SimpleRead($hash); - - return unless length($buf) > 0; - - my $msg = from_json($buf); - - return unless $msg->{event} eq "message"; - - my $nrReceivedMessages = ReadingsVal($hash->{phash},"nrReceivedMessages",0); - + - $nrReceivedMessages++; - readingsBeginUpdate($hash->{phash}); - readingsBulkUpdateIfChanged($hash->{phash},"nrReceivedMessages",$nrReceivedMessages++); - readingsBulkUpdateIfChanged($hash->{phash}, "lastReceivedTopic", $msg->{topic}); - readingsEndUpdate($hash->{phash},1); - - NTFY_LOG(LOG_DEBUG, $hash->{NAME} . " received: " . $buf); - -} sub NTFY_Publish_Msg { my $hash = shift; @@ -458,9 +341,14 @@ sub NTFY_Set return undef; } + elsif ($cmd eq "subscribe") + { + NTFY_LOG(LOG_DEBUG, "full command: " . join(' ', @args)); + NTFY_newSubscription($hash, $args[0]); + } else { - return "Unknown argument $cmd, choose one of publish" + return "Unknown argument $cmd, choose one of publish subscribe" } } @@ -471,6 +359,55 @@ sub NTFY_Attr return undef; } +# +# Process the incoming notifications/ messages +# +sub NTFY_Process_Message +{ + my $hash = shift; + my $msg = shift; + + my $msgData = from_json($msg); + + return unless $msgData->{event} eq "message"; + + my $nrReceivedMessages = ReadingsVal($hash->{NAME}, "nrReceivedMessages", 0); + $nrReceivedMessages++; + + readingsBeginUpdate($hash); + readingsBulkUpdateIfChanged($hash,"nrReceivedMessages",$nrReceivedMessages); + readingsBulkUpdateIfChanged($hash,"lastReceivedTopic",$msgData->{topic}) unless !$msgData->{topic}; + readingsBulkUpdateIfChanged($hash,"lastReceivedTitle",$msgData->{title}) unless !$msgData->{title}; + readingsBulkUpdateIfChanged($hash,"lastReceivedData", $msgData->{message}) unless !$msgData->{message}; + readingsBulkUpdateIfChanged($hash,"lastReceivedRawMessage", $msg); + readingsEndUpdate($hash,1); + + +} + +# +# Parse incoming notifications from websocket clients +# +sub NTFY_Parse ($$) +{ + my ( $ioHash, $message) = @_; + + return unless (substr($message,0,5) eq "NTFY:"); + + $message=~/^NTFY:(.*)---(.*)$/; + + my $server = $1; + my $msg = $2; + + if(my $hash = $modules{NTFY_CLIENT}{defptr}{$server}) + { + NTFY_Update_Subscriptions_Readings($hash); + NTFY_Process_Message($hash, $msg); + return $hash->{NAME}; + } + + return; +} 1; @@ -537,6 +474,12 @@ sub NTFY_Attr Everything without a prefix is considered the messages and is concatenated. +