-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrud_app.py
116 lines (95 loc) · 3.19 KB
/
crud_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from flask import Flask, jsonify, request
import psycopg2
from kafka import KafkaProducer
import json
app = Flask(__name__)
# Database connection configuration
db_host = 'localhost'
db_port = 5432
db_name = 'kafka'
db_user = 'data_eng'
db_password = 'data_eng'
# Kafka producer configuration
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Create a connection to the database
conn = psycopg2.connect(
host=db_host,
port=db_port,
dbname=db_name,
user=db_user,
password=db_password
)
# API endpoint to create a new user
@app.route('/users', methods=['POST'])
def create_user():
try:
# Extract user information from the request
user_data = request.get_json()
id = int(user_data['id'])
name = user_data['name']
# Insert the new user into the database
cursor = conn.cursor()
query = f"INSERT INTO users (id, name) VALUES ({id}, '{name}') RETURNING id"
cursor.execute(query)
conn.commit()
# Publish user_created event to Kafka
event_data = {'id': id, 'name': name}
producer.send('user_created', value=event_data)
return jsonify({'id':id}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
# API endpoint to get user information
@app.route('/users/<user_id>', methods=['GET'])
def get_user(user_id):
try:
# Retrieve user information from the database
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE id = {user_id}"
cursor.execute(query)
user = cursor.fetchone()
if user:
# Return user information as JSON
return jsonify({
'id': user[0],
'name': user[1]
})
else:
return jsonify({'error': 'User not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
# API endpoint to update user information
@app.route('/users/<user_id>', methods=['PUT'])
def update_user(user_id):
try:
# Extract updated user information from the request
user_data = request.get_json()
name = user_data['name']
# Update user information in the database
cursor = conn.cursor()
query = f"UPDATE users SET name = '{name}' WHERE id = {user_id}"
cursor.execute(query)
conn.commit()
# Publish user_updated event to Kafka
event_data = {'id': int(user_id), 'name': name}
producer.send('user_updated', value=event_data)
return jsonify({'message': 'User updated successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# API endpoint to delete a user
@app.route('/users/<user_id>', methods=['DELETE'])
def delete_user(user_id):
try:
# Delete the user from the database
cursor = conn.cursor()
query = f"DELETE FROM users WHERE id = {user_id}"
cursor.execute(query)
conn.commit()
return jsonify({'message': 'User deleted successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run()