-
Notifications
You must be signed in to change notification settings - Fork 114
/
Copy pathLocation_based_services.py
123 lines (105 loc) · 5.25 KB
/
Location_based_services.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import json
import pyttsx3
import speech_recognition as sr
import aiohttp # For async API requests
import geopy.distance # To calculate distance between coordinates
# Initialize TTS engine
engine = pyttsx3.init()
engine.setProperty('rate', 150)
# Function to convert text to speech asynchronously
async def speak_async(text):
engine.say(text)
engine.runAndWait()
await asyncio.sleep(0.5) # Add a delay after speaking
# Function to get user input through speech recognition
async def voice_to_text_async():
recognizer = sr.Recognizer()
await asyncio.sleep(0.5) # Ensure TTS finishes before listening
with sr.Microphone() as source:
print("Listening for your command...")
try:
audio = recognizer.listen(source, timeout=10)
command = recognizer.recognize_google(audio)
return command
except sr.UnknownValueError:
await speak_async("I didn't catch that. Could you please repeat?")
return None
except sr.RequestError:
await speak_async("The speech recognition service is unavailable at the moment.")
return None
# Simulate getting current location (replace this with a real GPS location API)
def get_current_location():
return {"lat": 40.712776, "lng": -74.005974} # New York City coordinates
# Asynchronous function to fetch nearby services using aiohttp
async def get_nearby_services(service_type):
location = get_current_location()
latitude = location["lat"]
longitude = location["lng"]
api_key = "your_actual_google_places_api_key"
url = f"https://maps.googleapis.com/maps/api/place/nearbysearch/json?location={latitude},{longitude}&radius=1500&type={service_type}&key={api_key}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(url) as response:
if response.status == 200:
places = await response.json()
results = places.get('results', [])
if results:
await speak_async(f"Here are some {service_type} nearby:")
for place in results[:3]:
name = place.get('name', 'Unknown place')
await speak_async(f"{name}")
else:
await speak_async(f"Sorry, I couldn't find any {service_type} nearby.")
else:
await speak_async(f"Failed to retrieve nearby services. Error Code: {response.status}")
except aiohttp.ClientError as e:
await speak_async(f"An error occurred: {str(e)}")
# Geofencing Alert: Set location-based reminders
async def set_geofencing_alert(reminder, target_location):
current_location = get_current_location()
target_coords = (target_location.get('lat'), target_location.get('lng'))
if current_location and target_coords:
try:
distance_to_target = geopy.distance.distance((current_location['lat'], current_location['lng']), target_coords).km
if distance_to_target < 0.5:
await speak_async(f"Reminder: {reminder}")
else:
await speak_async(f"Geofencing reminder set. Distance: {distance_to_target:.2f} km.")
except ValueError:
await speak_async("Invalid coordinates provided for geofencing.")
else:
await speak_async("Unable to get current location or target location.")
# Background geofencing monitoring loop
async def geofencing_monitor(reminder, target_location, interval=60):
while True:
await set_geofencing_alert(reminder, target_location)
await asyncio.sleep(interval)
# Example store location (replace this with a real location if necessary)
store_location = {"name": "Local Grocery Store", "lat": 40.713776, "lng": -74.006974}
# Main function for the assistant
async def main():
await speak_async("Welcome! How can I assist you today?")
while True:
command = await voice_to_text_async()
if command:
if "nearby" in command.lower() and any(service in command.lower() for service in ["restaurant", "gas station", "hospital", "ATM"]):
service_type = None
if "restaurant" in command.lower():
service_type = "restaurant"
elif "gas station" in command.lower():
service_type = "gas_station"
elif "hospital" in command.lower():
service_type = "hospital"
elif "ATM" in command.lower():
service_type = "atm"
if service_type:
await get_nearby_services(service_type)
elif "remind me" in command.lower() and "near store" in command.lower():
await speak_async("Starting geofencing reminder for store.")
asyncio.create_task(geofencing_monitor("pick up groceries", store_location))
elif "exit" in command.lower():
await speak_async("Goodbye!")
break
if __name__ == "__main__":
asyncio.run(main())