· 5 years ago · Dec 10, 2020, 05:24 PM
1package lila.streamer
2
3import akka.actor._
4import org.joda.time.DateTime
5import play.api.libs.json._
6import play.api.libs.ws.WSClient
7import scala.concurrent.duration._
8import scala.util.chaining._
9
10import lila.common.Bus
11import lila.common.config.Secret
12import lila.user.User
13
14final private class Streaming(
15 ws: WSClient,
16 api: StreamerApi,
17 isOnline: User.ID => Boolean,
18 timeline: lila.hub.actors.Timeline,
19 keyword: Stream.Keyword,
20 alwaysFeatured: () => lila.common.Strings,
21 googleApiKey: Secret,
22 twitchCredentials: () => (String, String)
23) extends Actor {
24
25 import Stream._
26 import Twitch.Reads._
27 import YouTube.Reads._
28
29 private case object Tick
30
31 private var liveStreams = LiveStreams(Nil)
32
33 implicit def ec = context.dispatcher
34
35 def receive = {
36
37 case Streaming.Get => sender() ! liveStreams
38
39 case Tick => updateStreams addEffectAnyway scheduleTick
40 }
41
42 private def scheduleTick = context.system.scheduler.scheduleOnce(15 seconds, self, Tick)
43
44 self ! Tick
45
46 def updateStreams: Funit =
47 for {
48 streamerIds <- api.allListedIds
49 activeIds = streamerIds.filter { id =>
50 liveStreams.has(id) || isOnline(id.value)
51 }
52 streamers <- (api byIds activeIds)
53 (twitchStreams, youTubeStreams) <-
54 fetchTwitchStreams(streamers, 0, None, Nil) zip fetchYouTubeStreams(streamers)
55 streams = LiveStreams {
56 scala.util.Random.shuffle {
57 (twitchStreams.pp ::: youTubeStreams.pp) pipe dedupStreamers
58 }
59 }
60 _ <- api.setLiveNow(streamers.filter(streams.has).map(_.id))
61 } yield publishStreams(streamers, streams)
62
63 def publishStreams(streamers: List[Streamer], newStreams: LiveStreams) = {
64 if (newStreams != liveStreams) {
65 newStreams.streams filterNot { s =>
66 liveStreams has s.streamer
67 } foreach { s =>
68 timeline ! {
69 import lila.hub.actorApi.timeline.{ Propagate, StreamStart }
70 Propagate(StreamStart(s.streamer.userId, s.streamer.name.value)) toFollowersOf s.streamer.userId
71 }
72 Bus.publish(
73 lila.hub.actorApi.streamer.StreamStart(s.streamer.userId),
74 "streamStart"
75 )
76 }
77 }
78 liveStreams = newStreams
79 streamers foreach { streamer =>
80 streamer.twitch.foreach { t =>
81 if (liveStreams.streams.exists(s => s.serviceName == "twitch" && s.is(streamer)))
82 lila.mon.tv.streamer.present(s"${t.userId}@twitch").increment()
83 }
84 streamer.youTube.foreach { t =>
85 if (liveStreams.streams.exists(s => s.serviceName == "youTube" && s.is(streamer)))
86 lila.mon.tv.streamer.present(s"${t.channelId}@youtube").increment()
87 }
88 }
89 }
90
91 def fetchTwitchStreams(
92 streamers: List[Streamer],
93 page: Int,
94 pagination: Option[Twitch.Pagination],
95 acc: List[Twitch.Stream]
96 ): Fu[List[Twitch.Stream]] = {
97 val (clientId, secret) = twitchCredentials()
98 if (clientId.nonEmpty && secret.nonEmpty && page < 10) {
99 val query = List(
100 "game_id" -> "368914", // shogi
101 "first" -> "100" // max results per page
102 ) ::: List(
103 pagination.flatMap(_.cursor).map { "after" -> _ }
104 ).flatten
105 ws.url("https://api.twitch.tv/helix/streams")
106 .withQueryStringParameters(query: _*)
107 .withHttpHeaders(
108 "Client-ID" -> clientId,
109 "Authorization" -> s"Bearer $secret"
110 )
111 .get()
112 .flatMap {
113 case res if res.status == 200 =>
114 res.json.validate[Twitch.Result](twitchResultReads) match {
115 case JsSuccess(result, _) => fuccess(result).pp
116 case JsError(err) => fufail(s"twitch $err ${lila.log http res}".pp)
117 }
118 case res => fufail(s"twitch ${lila.log http res}".pp)
119 }
120 .recover {
121 case e: Exception =>
122 logger.warn(e.getMessage)
123 Twitch.Result(None, None)
124 }
125 .monSuccess(_.tv.streamer.twitch)
126 .flatMap { result =>
127 if (result.data.exists(_.nonEmpty) && result.pagination.map(_.cursor).getOrElse(None) != None)
128 fetchTwitchStreams(
129 streamers,
130 (page + 1).pp,
131 result.pagination.pp,
132 acc ::: result.streams(
133 keyword,
134 streamers,
135 alwaysFeatured().value.map(_.toLowerCase)
136 ).pp
137 )
138 else fuccess(acc.pp)
139 }
140 } else fuccess(acc.pp)
141 }
142
143 private var prevYouTubeStreams = YouTube.StreamsFetched(Nil, DateTime.now)
144
145 def fetchYouTubeStreams(streamers: List[Streamer]): Fu[List[YouTube.Stream]] = {
146 val youtubeStreamers = streamers.filter(_.youTube.isDefined)
147 (youtubeStreamers.nonEmpty && googleApiKey.value.nonEmpty) ?? {
148 val now = DateTime.now
149 val res =
150 if (prevYouTubeStreams.list.isEmpty && prevYouTubeStreams.at.isAfter(now minusMinutes 3))
151 fuccess(prevYouTubeStreams)
152 else if (prevYouTubeStreams.at.isAfter(now minusMinutes 1))
153 fuccess(prevYouTubeStreams)
154 else {
155 ws.url("https://www.googleapis.com/youtube/v3/search")
156 .withQueryStringParameters(
157 "part" -> "snippet",
158 "type" -> "video",
159 "eventType" -> "live",
160 "q" -> keyword.value,
161 "key" -> googleApiKey.value
162 )
163 .get()
164 .flatMap { res =>
165 res.json.validate[YouTube.Result](youtubeResultReads) match {
166 case JsSuccess(data, _) =>
167 fuccess(YouTube.StreamsFetched(data.streams(keyword, youtubeStreamers), now))
168 case JsError(err) =>
169 fufail(s"youtube ${res.status} $err ${res.body.take(500)}")
170 }
171 }
172 .monSuccess(_.tv.streamer.youTube)
173 .recover {
174 case e: Exception =>
175 logger.warn(e.getMessage)
176 YouTube.StreamsFetched(Nil, now)
177 }
178 }
179 res dmap { r =>
180 prevYouTubeStreams = r
181 r.list
182 }
183 }
184 }
185
186 def dedupStreamers(streams: List[Stream]): List[Stream] =
187 streams
188 .foldLeft((Set.empty[Streamer.Id], List.empty[Stream])) {
189 case ((streamerIds, streams), stream) if streamerIds(stream.streamer.id) => (streamerIds, streams)
190 case ((streamerIds, streams), stream) => (streamerIds + stream.streamer.id, stream :: streams)
191 }
192 ._2
193}
194
195object Streaming {
196
197 case object Get
198}
199