-
Notifications
You must be signed in to change notification settings - Fork 11
/
spotify-sync.py
147 lines (117 loc) · 5.11 KB
/
spotify-sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import re
import time
import logging
from plexapi.server import PlexServer
from plexapi.audio import Track
import spotipy
import os
from spotipy.oauth2 import SpotifyClientCredentials
from typing import List
def filterPlexArray(plexItems=[], song="", artist="") -> List[Track]:
for item in list(plexItems):
if type(item) is not Track:
plexItems.remove(item)
continue
if item.title.lower() != song.lower():
plexItems.remove(item)
continue
artistItem = item.artist()
if artistItem.title.lower() != artist.lower():
plexItems.remove(item)
continue
return plexItems
def getSpotifyPlaylist(sp: spotipy.client, userId: str, playlistId: str) -> []:
playlist = sp.user_playlist(userId, playlistId)
return playlist
# Returns a list of spotify playlist objects
def getSpotifyUserPlaylists(sp: spotipy.client, userId: str) -> []:
playlists = sp.user_playlists(userId)
spotifyPlaylists = []
while playlists:
playlistItems = playlists['items']
for i, playlist in enumerate(playlistItems):
if playlist['owner']['id'] == userId:
spotifyPlaylists.append(getSpotifyPlaylist(sp, userId, playlist['id']))
if playlists['next']:
playlists = sp.next(playlists)
else:
playlists = None
return spotifyPlaylists
def getSpotifyTracks(sp: spotipy.client, playlist: []) -> []:
spotifyTracks = []
tracks = playlist['tracks']
spotifyTracks.extend(tracks['items'])
while tracks['next']:
tracks = sp.next(tracks)
spotifyTracks.extend(tracks['items'])
return spotifyTracks
def getPlexTracks(plex: PlexServer, spotifyTracks: []) -> List[Track]:
plexTracks = []
for spotifyTrack in spotifyTracks:
track = spotifyTrack['track']
logging.info("Searching Plex for: %s by %s" % (track['name'], track['artists'][0]['name']))
try:
musicTracks = plex.search(track['name'], mediatype='track')
except:
try:
musicTracks = plex.search(track['name'], mediatype='track')
except:
logging.info("Issue making plex request")
continue
if len(musicTracks) > 0:
plexMusic = filterPlexArray(musicTracks, track['name'], track['artists'][0]['name'])
if len(plexMusic) > 0:
logging.info("Found Plex Song: %s by %s" % (track['name'], track['artists'][0]['name']))
plexTracks.append(plexMusic[0])
return plexTracks
def createPlaylist(plex: PlexServer, sp: spotipy.Spotify, playlist: []):
playlistName = playlist['owner']['display_name'] + " - " + playlist['name']
logging.info('Starting playlist %s' % playlistName)
plexTracks = getPlexTracks(plex, getSpotifyTracks(sp, playlist))
if len(plexTracks) > 0:
try:
plexPlaylist = plex.playlist(playlistName)
logging.info('Updating playlist %s' % playlistName)
plexPlaylist.addItems(plexTracks)
except:
logging.info("Creating playlist %s" % playlistName)
plex.createPlaylist(playlistName, plexTracks)
def parseSpotifyURI(uriString: str) -> {}:
spotifyUriStrings = re.sub(r'^spotify:', '', uriString).split(":")
spotifyUriParts = {}
for i, string in enumerate(spotifyUriStrings):
if i % 2 == 0:
spotifyUriParts[spotifyUriStrings[i]] = spotifyUriStrings[i+1]
return spotifyUriParts
def runSync(plex : PlexServer, sp : spotipy.Spotify, spotifyURIs: []):
logging.info('Starting a Sync Operation')
playlists = []
for spotifyUriParts in spotifyURIs:
if 'user' in spotifyUriParts.keys() and 'playlist' not in spotifyUriParts.keys():
logging.info('Getting playlists for %s' % spotifyUriParts['user'])
playlists.extend(getSpotifyUserPlaylists(sp, spotifyUriParts['user']))
elif 'user' in spotifyUriParts.keys() and 'playlist' in spotifyUriParts.keys():
logging.info('Getting playlist %s from user %s' % (spotifyUriParts['user'], spotifyUriParts['playlist']))
playlists.append(getSpotifyPlaylist(sp, spotifyUriParts['user'], spotifyUriParts['playlist']))
for playlist in playlists:
createPlaylist(plex, sp, playlist)
logging.info('Finished a Sync Operation')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
spotifyUris = os.environ.get('SPOTIFY_URIS')
if spotifyUris is None:
logging.error("No spotify uris")
secondsToWait = int(os.environ.get('SECONDS_TO_WAIT', 1800))
baseurl = os.environ.get('PLEX_URL')
token = os.environ.get('PLEX_TOKEN')
plex = PlexServer(baseurl, token)
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
spotifyUris = spotifyUris.split(",")
spotifyMainUris = []
for spotifyUri in spotifyUris:
spotifyUriParts = parseSpotifyURI(spotifyUri)
spotifyMainUris.append(spotifyUriParts)
while True:
runSync(plex, sp, spotifyMainUris)
time.sleep(secondsToWait)